http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/CloudbaseRyaQueryEngine.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/CloudbaseRyaQueryEngine.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/CloudbaseRyaQueryEngine.java new file mode 100644 index 0000000..a62aedb --- /dev/null +++ b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/CloudbaseRyaQueryEngine.java @@ -0,0 +1,385 @@ +package mvm.rya.cloudbase.query; + +import cloudbase.core.client.BatchScanner; +import cloudbase.core.client.Connector; +import cloudbase.core.client.Scanner; +import cloudbase.core.client.ScannerBase; +import cloudbase.core.data.Key; +import cloudbase.core.data.Range; +import cloudbase.core.data.Value; +import cloudbase.core.iterators.FilteringIterator; +import cloudbase.core.iterators.RegExIterator; +import cloudbase.core.iterators.filter.AgeOffFilter; +import cloudbase.core.iterators.filter.RegExFilter; +import cloudbase.core.security.Authorizations; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Iterators; +import mango.collect.CloseableIterable; +import mango.collect.CloseableIterables; +import mango.collect.FluentCloseableIterable; +import info.aduna.iteration.CloseableIteration; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.RdfCloudTripleStoreUtils; +import mvm.rya.api.domain.RyaRange; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.layout.TableLayoutStrategy; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.api.persist.query.BatchRyaQuery; +import mvm.rya.api.persist.query.RyaQuery; +import mvm.rya.api.persist.query.RyaQueryEngine; +import mvm.rya.api.query.strategy.ByteRange; +import mvm.rya.api.query.strategy.TriplePatternStrategy; +import mvm.rya.api.resolver.RyaContext; +import mvm.rya.api.resolver.triple.TripleRowRegex; +import mvm.rya.api.utils.CloseableIterableIteration; +import mvm.rya.cloudbase.CloudbaseRdfConfiguration; +import mvm.rya.iterators.LimitingAgeOffFilter; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.Text; +import org.openrdf.query.BindingSet; + +import java.io.IOException; +import java.util.*; + +import static mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import static mvm.rya.api.RdfCloudTripleStoreUtils.layoutToTable; + +/** + * Date: 7/17/12 + * Time: 9:28 AM + */ +public class CloudbaseRyaQueryEngine implements RyaQueryEngine<CloudbaseRdfConfiguration> { + + private Log logger = LogFactory.getLog(CloudbaseRyaQueryEngine.class); + private CloudbaseRdfConfiguration configuration; + private RyaContext ryaContext = RyaContext.getInstance(); + private Connector connector; + private Map<TABLE_LAYOUT, KeyValueToRyaStatementFunction> keyValueToRyaStatementFunctionMap = new HashMap<TABLE_LAYOUT, KeyValueToRyaStatementFunction>(); + + public CloudbaseRyaQueryEngine(Connector connector) { + this(connector, new CloudbaseRdfConfiguration()); + } + + public CloudbaseRyaQueryEngine(Connector connector, CloudbaseRdfConfiguration conf) { + this.connector = connector; + this.configuration = conf; + + keyValueToRyaStatementFunctionMap.put(TABLE_LAYOUT.SPO, new KeyValueToRyaStatementFunction(TABLE_LAYOUT.SPO)); + keyValueToRyaStatementFunctionMap.put(TABLE_LAYOUT.PO, new KeyValueToRyaStatementFunction(TABLE_LAYOUT.PO)); + keyValueToRyaStatementFunctionMap.put(TABLE_LAYOUT.OSP, new KeyValueToRyaStatementFunction(TABLE_LAYOUT.OSP)); + } + + @Override + public CloseableIteration<RyaStatement, RyaDAOException> query(RyaStatement stmt, CloudbaseRdfConfiguration conf) throws RyaDAOException { + if (conf == null) { + conf = configuration; + } + + RyaQuery ryaQuery = RyaQuery.builder(stmt).load(conf).build(); + CloseableIterable<RyaStatement> results = query(ryaQuery); + + return new CloseableIterableIteration<RyaStatement, RyaDAOException>(results); + } + + protected String getData(RyaType ryaType) { + return (ryaType != null) ? (ryaType.getData()) : (null); + } + + @Override + public CloseableIteration<? extends Map.Entry<RyaStatement, BindingSet>, RyaDAOException> queryWithBindingSet(Collection<Map.Entry<RyaStatement, BindingSet>> stmts, CloudbaseRdfConfiguration conf) throws RyaDAOException { + if (conf == null) { + conf = configuration; + } + //query configuration + Authorizations authorizations = conf.getAuthorizations(); + Long ttl = conf.getTtl(); + Long maxResults = conf.getLimit(); + Integer maxRanges = conf.getMaxRangesForScanner(); + Integer numThreads = conf.getNumThreads(); + + //TODO: cannot span multiple tables here + try { + Collection<Range> ranges = new HashSet<Range>(); + RangeBindingSetEntries rangeMap = new RangeBindingSetEntries(); + TABLE_LAYOUT layout = null; + RyaURI context = null; + TriplePatternStrategy strategy = null; + for (Map.Entry<RyaStatement, BindingSet> stmtbs : stmts) { + RyaStatement stmt = stmtbs.getKey(); + context = stmt.getContext(); //TODO: This will be overwritten + BindingSet bs = stmtbs.getValue(); + strategy = ryaContext.retrieveStrategy(stmt); + if (strategy == null) { + throw new IllegalArgumentException("TriplePattern[" + stmt + "] not supported"); + } + + Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, ByteRange> entry = + strategy.defineRange(stmt.getSubject(), stmt.getPredicate(), stmt.getObject(), stmt.getContext(), conf); + + //use range to set scanner + //populate scanner based on authorizations, ttl + layout = entry.getKey(); + ByteRange byteRange = entry.getValue(); + Range range = new Range(new Text(byteRange.getStart()), new Text(byteRange.getEnd())); + ranges.add(range); + rangeMap.ranges.add(new RdfCloudTripleStoreUtils.CustomEntry<Range, BindingSet>(range, bs)); + } + //no ranges + if (layout == null) return null; + String regexSubject = conf.getRegexSubject(); + String regexPredicate = conf.getRegexPredicate(); + String regexObject = conf.getRegexObject(); + TripleRowRegex tripleRowRegex = strategy.buildRegex(regexSubject, regexPredicate, regexObject, null, null); + + String table = layoutToTable(layout, conf); + boolean useBatchScanner = ranges.size() > maxRanges; + RyaStatementBindingSetKeyValueIterator iterator = null; + if (useBatchScanner) { + ScannerBase scanner = connector.createBatchScanner(table, authorizations, numThreads); + ((BatchScanner) scanner).setRanges(ranges); + fillScanner(scanner, context, ttl, tripleRowRegex); + iterator = new RyaStatementBindingSetKeyValueIterator(layout, scanner, rangeMap); + } else { + Scanner scannerBase = null; + Iterator<Map.Entry<Key, Value>>[] iters = new Iterator[ranges.size()]; + int i = 0; + for (Range range : ranges) { + scannerBase = connector.createScanner(table, authorizations); + scannerBase.setRange(range); + fillScanner(scannerBase, context, ttl, tripleRowRegex); + iters[i] = scannerBase.iterator(); + i++; + } + iterator = new RyaStatementBindingSetKeyValueIterator(layout, Iterators.concat(iters), rangeMap); + } + if (maxResults != null) { + iterator.setMaxResults(maxResults); + } + return iterator; + } catch (Exception e) { + throw new RyaDAOException(e); + } + + } + + @Override + public CloseableIteration<RyaStatement, RyaDAOException> batchQuery(Collection<RyaStatement> stmts, CloudbaseRdfConfiguration conf) + throws RyaDAOException { + if (conf == null) { + conf = configuration; + } + + BatchRyaQuery batchRyaQuery = BatchRyaQuery.builder(stmts).load(conf).build(); + CloseableIterable<RyaStatement> results = query(batchRyaQuery); + + return new CloseableIterableIteration<RyaStatement, RyaDAOException>(results); + } + + @Override + public CloseableIterable<RyaStatement> query(RyaQuery ryaQuery) throws RyaDAOException { + Preconditions.checkNotNull(ryaQuery); + RyaStatement stmt = ryaQuery.getQuery(); + Preconditions.checkNotNull(stmt); + + //query configuration + String[] auths = ryaQuery.getAuths(); + Authorizations authorizations = auths != null ? new Authorizations(auths) : configuration.getAuthorizations(); + Long ttl = ryaQuery.getTtl(); + Long currentTime = ryaQuery.getCurrentTime(); + Long maxResults = ryaQuery.getMaxResults(); + Integer batchSize = ryaQuery.getBatchSize(); + String regexSubject = ryaQuery.getRegexSubject(); + String regexPredicate = ryaQuery.getRegexPredicate(); + String regexObject = ryaQuery.getRegexObject(); + TableLayoutStrategy tableLayoutStrategy = configuration.getTableLayoutStrategy(); + + try { + //find triple pattern range + TriplePatternStrategy strategy = ryaContext.retrieveStrategy(stmt); + TABLE_LAYOUT layout; + Range range; + RyaURI subject = stmt.getSubject(); + RyaURI predicate = stmt.getPredicate(); + RyaType object = stmt.getObject(); + RyaURI context = stmt.getContext(); + String qualifier = stmt.getQualifer(); + TripleRowRegex tripleRowRegex = null; + if (strategy != null) { + //otherwise, full table scan is supported + Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, ByteRange> entry = + strategy.defineRange(subject, predicate, object, context, null); + layout = entry.getKey(); + ByteRange byteRange = entry.getValue(); + range = new Range(new Text(byteRange.getStart()), new Text(byteRange.getEnd())); + + byte[] objectTypeInfo = null; + if (object != null) { + //TODO: Not good to serialize this twice + if (object instanceof RyaRange) { + objectTypeInfo = RyaContext.getInstance().serializeType(((RyaRange) object).getStart())[1]; + } else { + objectTypeInfo = RyaContext.getInstance().serializeType(object)[1]; + } + } + + tripleRowRegex = strategy.buildRegex(regexSubject, regexPredicate, regexObject, null, objectTypeInfo); + } else { + range = new Range(); + layout = TABLE_LAYOUT.SPO; + } + + //use range to set scanner + //populate scanner based on authorizations, ttl + String table = layoutToTable(layout, tableLayoutStrategy); + Scanner scanner = connector.createScanner(table, authorizations); + int itrLevel = 20; + if (context != null && qualifier != null) { + scanner.fetchColumn(new Text(context.getData()), new Text(qualifier)); + } else if (context != null) { + scanner.fetchColumnFamily(new Text(context.getData())); + } else if (qualifier != null) { + scanner.setScanIterators(itrLevel++, RegExIterator.class.getName(), "riq"); + scanner.setScanIteratorOption("riq", RegExFilter.COLQ_REGEX, qualifier); + } + if (ttl != null) { + scanner.setScanIterators(itrLevel++, FilteringIterator.class.getName(), "fi"); + scanner.setScanIteratorOption("fi", "0", LimitingAgeOffFilter.class.getName()); + scanner.setScanIteratorOption("fi", "0." + LimitingAgeOffFilter.TTL, ttl.toString()); + if (currentTime != null) + scanner.setScanIteratorOption("fi", "0." + LimitingAgeOffFilter.CURRENT_TIME, currentTime.toString()); + } + scanner.setRange(range); + if (batchSize != null) { + scanner.setBatchSize(batchSize); + } + //TODO: Fill in context regex + if (tripleRowRegex != null) { + scanner.setScanIterators(itrLevel++, RegExIterator.class.getName(), "ri"); + scanner.setScanIteratorOption("ri", RegExFilter.ROW_REGEX, tripleRowRegex.getRow()); + } + + FluentCloseableIterable<RyaStatement> results = FluentCloseableIterable.from(new ScannerCloseableIterable(scanner)) + .transform(keyValueToRyaStatementFunctionMap.get(layout)); + if (maxResults != null) { + results = results.limit(maxResults.intValue()); + } + + return results; + } catch (Exception e) { + throw new RyaDAOException(e); + } + } + + @Override + public CloseableIterable<RyaStatement> query(BatchRyaQuery ryaQuery) throws RyaDAOException { + Preconditions.checkNotNull(ryaQuery); + Iterable<RyaStatement> stmts = ryaQuery.getQueries(); + Preconditions.checkNotNull(stmts); + + //query configuration + String[] auths = ryaQuery.getAuths(); + final Authorizations authorizations = auths != null ? new Authorizations(auths) : configuration.getAuthorizations(); + final Long ttl = ryaQuery.getTtl(); + Long currentTime = ryaQuery.getCurrentTime(); + Long maxResults = ryaQuery.getMaxResults(); + Integer batchSize = ryaQuery.getBatchSize(); + Integer numQueryThreads = ryaQuery.getNumQueryThreads(); + String regexSubject = ryaQuery.getRegexSubject(); + String regexPredicate = ryaQuery.getRegexPredicate(); + String regexObject = ryaQuery.getRegexObject(); + TableLayoutStrategy tableLayoutStrategy = configuration.getTableLayoutStrategy(); + int maxRanges = ryaQuery.getMaxRanges(); + + //TODO: cannot span multiple tables here + try { + Collection<Range> ranges = new HashSet<Range>(); + TABLE_LAYOUT layout = null; + RyaURI context = null; + TriplePatternStrategy strategy = null; + for (RyaStatement stmt : stmts) { + context = stmt.getContext(); //TODO: This will be overwritten + strategy = ryaContext.retrieveStrategy(stmt); + if (strategy == null) { + throw new IllegalArgumentException("TriplePattern[" + stmt + "] not supported"); + } + + Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, ByteRange> entry = + strategy.defineRange(stmt.getSubject(), stmt.getPredicate(), stmt.getObject(), stmt.getContext(), null); + + //use range to set scanner + //populate scanner based on authorizations, ttl + layout = entry.getKey(); + ByteRange byteRange = entry.getValue(); + Range range = new Range(new Text(byteRange.getStart()), new Text(byteRange.getEnd())); + ranges.add(range); + } + //no ranges + if (layout == null) throw new IllegalArgumentException("No table layout specified"); + + final TripleRowRegex tripleRowRegex = strategy.buildRegex(regexSubject, regexPredicate, regexObject, null, null); + + final String table = layoutToTable(layout, tableLayoutStrategy); + boolean useBatchScanner = ranges.size() > maxRanges; + FluentCloseableIterable<RyaStatement> results = null; + if (useBatchScanner) { + BatchScanner scanner = connector.createBatchScanner(table, authorizations, numQueryThreads); + scanner.setRanges(ranges); + fillScanner(scanner, context, ttl, tripleRowRegex); + results = FluentCloseableIterable.from(new BatchScannerCloseableIterable(scanner)).transform(keyValueToRyaStatementFunctionMap.get(layout)); + } else { + final RyaURI fcontext = context; + FluentIterable<RyaStatement> fluent = FluentIterable.from(ranges).transformAndConcat(new Function<Range, Iterable<Map.Entry<Key, Value>>>() { + @Override + public Iterable<Map.Entry<Key, Value>> apply(Range range) { + try { + Scanner scanner = connector.createScanner(table, authorizations); + scanner.setRange(range); + fillScanner(scanner, fcontext, ttl, tripleRowRegex); + return scanner; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }).transform(keyValueToRyaStatementFunctionMap.get(layout)); + + results = FluentCloseableIterable.from(CloseableIterables.wrap(fluent)); + } + if (maxResults != null) { + results = results.limit(maxResults.intValue()); + } + return results; + } catch (Exception e) { + throw new RyaDAOException(e); + } + } + + protected void fillScanner(ScannerBase scanner, RyaURI context, Long ttl, TripleRowRegex tripleRowRegex) throws IOException { + if (context != null) { + scanner.fetchColumnFamily(new Text(context.getData())); + } + if (ttl != null) { + scanner.setScanIterators(9, FilteringIterator.class.getName(), "fi"); + scanner.setScanIteratorOption("fi", "0", AgeOffFilter.class.getName()); + scanner.setScanIteratorOption("fi", "0.ttl", ttl.toString()); + } + if (tripleRowRegex != null) { + scanner.setScanIterators(11, RegExIterator.class.getName(), "ri"); + scanner.setScanIteratorOption("ri", RegExFilter.ROW_REGEX, tripleRowRegex.getRow()); + } + } + + @Override + public void setConf(CloudbaseRdfConfiguration conf) { + this.configuration = conf; + } + + @Override + public CloudbaseRdfConfiguration getConf() { + return configuration; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/KeyValueToRyaStatementFunction.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/KeyValueToRyaStatementFunction.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/KeyValueToRyaStatementFunction.java new file mode 100644 index 0000000..bb92c23 --- /dev/null +++ b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/KeyValueToRyaStatementFunction.java @@ -0,0 +1,47 @@ +package mvm.rya.cloudbase.query; + +import cloudbase.core.data.Key; +import cloudbase.core.data.Value; +import com.google.common.base.Function; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.resolver.RyaContext; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolverException; + +import java.util.Map; + +import static mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; + +/** + * Date: 1/30/13 + * Time: 2:09 PM + */ +public class KeyValueToRyaStatementFunction implements Function<Map.Entry<Key, Value>, RyaStatement> { + + private TABLE_LAYOUT tableLayout; + + public KeyValueToRyaStatementFunction(TABLE_LAYOUT tableLayout) { + this.tableLayout = tableLayout; + } + + @Override + public RyaStatement apply(Map.Entry<Key, Value> input) { + Key key = input.getKey(); + Value value = input.getValue(); + RyaStatement statement = null; + try { + statement = RyaContext.getInstance().deserializeTriple(tableLayout, + new TripleRow(key.getRowData().toArray(), + key.getColumnFamilyData().toArray(), + key.getColumnQualifierData().toArray(), + key.getTimestamp(), + key.getColumnVisibilityData().toArray(), + (value != null) ? value.get() : null + )); + } catch (TripleRowResolverException e) { + throw new RuntimeException(e); + } + + return statement; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/RangeBindingSetEntries.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/RangeBindingSetEntries.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/RangeBindingSetEntries.java new file mode 100644 index 0000000..69c6147 --- /dev/null +++ b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/RangeBindingSetEntries.java @@ -0,0 +1,37 @@ +package mvm.rya.cloudbase.query; + +import cloudbase.core.data.Key; +import cloudbase.core.data.Range; +import org.openrdf.query.BindingSet; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; + +/** + * Class RangeBindingSetCollection + * Date: Feb 23, 2011 + * Time: 10:15:48 AM + */ +public class RangeBindingSetEntries { + public Collection<Map.Entry<Range, BindingSet>> ranges; + + public RangeBindingSetEntries() { + this(new ArrayList<Map.Entry<Range, BindingSet>>()); + } + + public RangeBindingSetEntries(Collection<Map.Entry<Range, BindingSet>> ranges) { + this.ranges = ranges; + } + + public Collection<BindingSet> containsKey(Key key) { + //TODO: need to find a better way to sort these and pull + //TODO: maybe fork/join here + Collection<BindingSet> bss = new ArrayList<BindingSet>(); + for (Map.Entry<Range, BindingSet> entry : ranges) { + if (entry.getKey().contains(key)) + bss.add(entry.getValue()); + } + return bss; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/RyaStatementBindingSetKeyValueIterator.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/RyaStatementBindingSetKeyValueIterator.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/RyaStatementBindingSetKeyValueIterator.java new file mode 100644 index 0000000..cef9eb6 --- /dev/null +++ b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/RyaStatementBindingSetKeyValueIterator.java @@ -0,0 +1,129 @@ +package mvm.rya.cloudbase.query; + +import cloudbase.core.client.BatchScanner; +import cloudbase.core.client.Scanner; +import cloudbase.core.client.ScannerBase; +import cloudbase.core.data.Key; +import cloudbase.core.data.Value; +import info.aduna.iteration.CloseableIteration; +import mvm.rya.api.RdfCloudTripleStoreUtils; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.api.resolver.RyaContext; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolverException; +import mvm.rya.cloudbase.BatchScannerIterator; +import org.openrdf.query.BindingSet; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; + +import static mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; + +/** + * Date: 7/17/12 + * Time: 11:48 AM + */ +public class RyaStatementBindingSetKeyValueIterator implements CloseableIteration<Map.Entry<RyaStatement, BindingSet>, RyaDAOException> { + private Iterator<Map.Entry<Key, Value>> dataIterator; + private TABLE_LAYOUT tableLayout; + private Long maxResults = -1L; + private ScannerBase scanner; + private boolean isBatchScanner; + private RangeBindingSetEntries rangeMap; + private Iterator<BindingSet> bsIter; + private RyaStatement statement; + + public RyaStatementBindingSetKeyValueIterator(TABLE_LAYOUT tableLayout, ScannerBase scannerBase, RangeBindingSetEntries rangeMap) { + this(tableLayout, ((scannerBase instanceof BatchScanner) ? new BatchScannerIterator(((BatchScanner) scannerBase).iterator()) : ((Scanner) scannerBase).iterator()), rangeMap); + this.scanner = scannerBase; + isBatchScanner = scanner instanceof BatchScanner; + } + + public RyaStatementBindingSetKeyValueIterator(TABLE_LAYOUT tableLayout, Iterator<Map.Entry<Key, Value>> dataIterator, RangeBindingSetEntries rangeMap) { + this.tableLayout = tableLayout; + this.rangeMap = rangeMap; + this.dataIterator = dataIterator; + } + + @Override + public void close() throws RyaDAOException { + dataIterator = null; + if (scanner != null && isBatchScanner) { + ((BatchScanner) scanner).close(); + } + } + + public boolean isClosed() throws RyaDAOException { + return dataIterator == null; + } + + @Override + public boolean hasNext() throws RyaDAOException { + if (isClosed()) { + throw new RyaDAOException("Closed Iterator"); + } + if (maxResults != 0) { + if (bsIter != null && bsIter.hasNext()) { + return true; + } + if (dataIterator.hasNext()) { + return true; + } else { + maxResults = 0l; + return false; + } + } + return false; + } + + @Override + public Map.Entry<RyaStatement, BindingSet> next() throws RyaDAOException { + if (!hasNext()) { + return null; + } + + try { + while (true) { + if (bsIter != null && bsIter.hasNext()) { + maxResults--; + return new RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(statement, bsIter.next()); + } + + if (dataIterator.hasNext()) { + Map.Entry<Key, Value> next = dataIterator.next(); + Key key = next.getKey(); + statement = RyaContext.getInstance().deserializeTriple(tableLayout, + new TripleRow(key.getRowData().toArray(), key.getColumnFamilyData().toArray(), key.getColumnQualifierData().toArray(), + key.getTimestamp(), key.getColumnVisibilityData().toArray(), next.getValue().get())); + if (next.getValue() != null) { + statement.setValue(next.getValue().get()); + } + Collection<BindingSet> bindingSets = rangeMap.containsKey(key); + if (!bindingSets.isEmpty()) { + bsIter = bindingSets.iterator(); + } + } else { + break; + } + } + return null; + } catch (TripleRowResolverException e) { + throw new RyaDAOException(e); + } + } + + @Override + public void remove() throws RyaDAOException { + next(); + } + + public Long getMaxResults() { + return maxResults; + } + + public void setMaxResults(Long maxResults) { + this.maxResults = maxResults; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/RyaStatementKeyValueIterator.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/RyaStatementKeyValueIterator.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/RyaStatementKeyValueIterator.java new file mode 100644 index 0000000..602affe --- /dev/null +++ b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/RyaStatementKeyValueIterator.java @@ -0,0 +1,82 @@ +package mvm.rya.cloudbase.query; + +import cloudbase.core.data.Key; +import cloudbase.core.data.Value; +import info.aduna.iteration.CloseableIteration; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.api.resolver.RyaContext; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolverException; + +import java.util.Iterator; +import java.util.Map; + +import static mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; + +/** + * Date: 7/17/12 + * Time: 11:48 AM + */ +public class RyaStatementKeyValueIterator implements CloseableIteration<RyaStatement, RyaDAOException> { + private Iterator<Map.Entry<Key, Value>> dataIterator; + private TABLE_LAYOUT tableLayout; + private Long maxResults = -1L; + + public RyaStatementKeyValueIterator(TABLE_LAYOUT tableLayout, Iterator<Map.Entry<Key, Value>> dataIterator) { + this.tableLayout = tableLayout; + this.dataIterator = dataIterator; + } + + @Override + public void close() throws RyaDAOException { + dataIterator = null; + } + + public boolean isClosed() throws RyaDAOException { + return dataIterator == null; + } + + @Override + public boolean hasNext() throws RyaDAOException { + if (isClosed()) { + throw new RyaDAOException("Closed Iterator"); + } + return maxResults != 0 && dataIterator.hasNext(); + } + + @Override + public RyaStatement next() throws RyaDAOException { + if (!hasNext()) { + return null; + } + + try { + Map.Entry<Key, Value> next = dataIterator.next(); + Key key = next.getKey(); + RyaStatement statement = RyaContext.getInstance().deserializeTriple(tableLayout, + new TripleRow(key.getRowData().toArray(), key.getColumnFamilyData().toArray(), key.getColumnQualifierData().toArray(), + key.getTimestamp(), key.getColumnVisibilityData().toArray(), next.getValue().get())); + if (next.getValue() != null) { + statement.setValue(next.getValue().get()); + } + maxResults--; + return statement; + } catch (TripleRowResolverException e) { + throw new RyaDAOException(e); + } + } + + @Override + public void remove() throws RyaDAOException { + next(); + } + + public Long getMaxResults() { + return maxResults; + } + + public void setMaxResults(Long maxResults) { + this.maxResults = maxResults; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/ScannerCloseableIterable.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/ScannerCloseableIterable.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/ScannerCloseableIterable.java new file mode 100644 index 0000000..f9a51fc --- /dev/null +++ b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/ScannerCloseableIterable.java @@ -0,0 +1,35 @@ +package mvm.rya.cloudbase.query; + +import cloudbase.core.client.Scanner; +import cloudbase.core.data.Key; +import cloudbase.core.data.Value; +import com.google.common.base.Preconditions; +import mango.collect.AbstractCloseableIterable; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +/** + * Date: 1/30/13 + * Time: 2:15 PM + */ +public class ScannerCloseableIterable extends AbstractCloseableIterable<Map.Entry<Key, Value>> { + + protected Scanner scanner; + + public ScannerCloseableIterable(Scanner scanner) { + Preconditions.checkNotNull(scanner); + this.scanner = scanner; + } + + @Override + protected void doClose() throws IOException { + //do nothing + } + + @Override + protected Iterator<Map.Entry<Key, Value>> retrieveIterator() { + return scanner.iterator(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/test/java/mvm/rya/cloudbase/CloudbaseResearchMain.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/test/java/mvm/rya/cloudbase/CloudbaseResearchMain.java b/dao/cloudbase.rya/src/test/java/mvm/rya/cloudbase/CloudbaseResearchMain.java new file mode 100644 index 0000000..2c1ea55 --- /dev/null +++ b/dao/cloudbase.rya/src/test/java/mvm/rya/cloudbase/CloudbaseResearchMain.java @@ -0,0 +1,77 @@ +//package mvm.mmrts.cloudbase; +// +//import cloudbase.core.CBConstants; +//import cloudbase.core.client.BatchScanner; +//import cloudbase.core.client.Connector; +//import cloudbase.core.client.Scanner; +//import cloudbase.core.client.ZooKeeperInstance; +//import cloudbase.core.client.impl.MasterClient; +//import cloudbase.core.data.Key; +//import cloudbase.core.data.Range; +//import cloudbase.core.data.Value; +//import cloudbase.core.master.thrift.MasterClientService; +//import cloudbase.core.master.thrift.MasterMonitorInfo; +//import cloudbase.core.master.thrift.TableInfo; +//import cloudbase.core.master.thrift.TabletServerStatus; +//import cloudbase.core.security.thrift.AuthInfo; +//import mvm.rya.cloudbase.utils.pri.PriorityIterator; +//import org.apache.hadoop.io.Text; +// +//import java.util.Collections; +//import java.util.Iterator; +//import java.util.List; +//import java.util.Map; +// +///** +// * Created by IntelliJ IDEA. +// * User: RoshanP +// * Date: 3/28/12 +// * Time: 5:32 PM +// * To change this template use File | Settings | File Templates. +// */ +//public class CloudbaseResearchMain { +// +// +// public static void main(String[] args) { +// try { +// ZooKeeperInstance instance = new ZooKeeperInstance("stratus", "stratus13:2181"); +// +// MasterClientService.Iface client = MasterClient.getConnection(instance, false); +// MasterMonitorInfo mmi = client.getMasterStats(null, new AuthInfo("root", "password".getBytes(), "stratus")); +// +// List<TabletServerStatus> tServerInfo = mmi.getTServerInfo(); +// for (TabletServerStatus tstatus : tServerInfo) { +// System.out.println(tstatus.getName()); +// System.out.println(tstatus.getOsLoad()); +// Map<String, TableInfo> tableMap = tstatus.getTableMap(); +// double ingestRate = 0; +// double queryRate = 0; +// for (Map.Entry<String, TableInfo> entry : tableMap.entrySet()) { +// String tableName = entry.getKey(); +// TableInfo tableInfo = entry.getValue(); +// ingestRate += tableInfo.getIngestRate(); +// queryRate += tableInfo.getQueryRate(); +// } +// System.out.println(ingestRate); +// System.out.println(queryRate); +// } +// +// Connector connector = instance.getConnector("root", "password".getBytes()); +//// BatchScanner scanner = connector.createBatchScanner("l_spo", CBConstants.NO_AUTHS, 10); +//// scanner.setRanges(Collections.singleton(new Range(new Text("\0"), new Text("\uFFFD")))); +// Scanner scanner = connector.createScanner("l_spo", CBConstants.NO_AUTHS); +// scanner.setScanIterators(20, PriorityIterator.class.getName(), "pi"); +// Iterator<Map.Entry<Key,Value>> iter = scanner.iterator(); +// int count = 0; +// while(iter.hasNext()) { +// iter.next(); +// System.out.println(count++); +//// if(count == 100) break; +// } +//// scanner.close(); +// +// } catch (Exception e) { +// e.printStackTrace(); +// } +// } +//} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/test/java/mvm/rya/cloudbase/CloudbaseRyaDAOTest.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/test/java/mvm/rya/cloudbase/CloudbaseRyaDAOTest.java b/dao/cloudbase.rya/src/test/java/mvm/rya/cloudbase/CloudbaseRyaDAOTest.java new file mode 100644 index 0000000..af3e9ab --- /dev/null +++ b/dao/cloudbase.rya/src/test/java/mvm/rya/cloudbase/CloudbaseRyaDAOTest.java @@ -0,0 +1,588 @@ +package mvm.rya.cloudbase; + +import cloudbase.core.client.Connector; +import cloudbase.core.client.Scanner; +import cloudbase.core.client.TableNotFoundException; +import cloudbase.core.client.mock.MockInstance; +import info.aduna.iteration.CloseableIteration; +import mvm.rya.api.RdfCloudTripleStoreUtils; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.api.resolver.RdfToRyaConversions; +import mvm.rya.api.resolver.RyaContext; +import mvm.rya.cloudbase.query.CloudbaseRyaQueryEngine; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.BindingSet; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.UUID; + +import static org.junit.Assert.*; + +/** + * Class CloudbaseRyaDAOTest + * Date: Mar 7, 2012 + * Time: 9:42:28 AM + */ +public class CloudbaseRyaDAOTest { + + private CloudbaseRyaDAO dao; + private ValueFactory vf = new ValueFactoryImpl(); + static String litdupsNS = "urn:test:litdups#"; + private Connector connector; + private CloudbaseRdfConfiguration conf = new CloudbaseRdfConfiguration(); + + @Before + public void setUp() throws Exception { + dao = new CloudbaseRyaDAO(); + connector = new MockInstance().getConnector("", ""); + dao.setConnector(connector); + dao.setConf(conf); + dao.init(); + } + + @After + public void tearDown() throws Exception { + //dao.purge(conf); + dao.destroy(); + } + + @Test + public void testAdd() throws Exception { + RyaURI cpu = new RyaURI(litdupsNS + "cpu"); + RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc"); + RyaURI uri1 = new RyaURI(litdupsNS + "uri1"); + dao.add(new RyaStatement(cpu, loadPerc, uri1)); + + CloudbaseRyaQueryEngine queryEngine = dao.getQueryEngine(); + + CloseableIteration<RyaStatement, RyaDAOException> iter = queryEngine.query(new RyaStatement(cpu, loadPerc, null), conf); + int count = 0; + while (iter.hasNext()) { + assertTrue(uri1.equals(iter.next().getObject())); + count++; + } + iter.close(); + assertEquals(1, count); + + dao.delete(new RyaStatement(cpu, loadPerc, uri1), conf); + + iter = queryEngine.query(new RyaStatement(cpu, loadPerc, null), conf); + count = 0; + while (iter.hasNext()) { + count++; + iter.next(); + } + iter.close(); + assertEquals(0, count); + } + + @Test + public void testMaxResults() throws Exception { + RyaURI cpu = new RyaURI(litdupsNS + "cpu"); + RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc"); + dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + "uri1"))); + dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + "uri2"))); + dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + "uri3"))); + dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + "uri4"))); + dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + "uri5"))); + + CloudbaseRyaQueryEngine queryEngine = dao.getQueryEngine(); + CloudbaseRdfConfiguration queryConf = new CloudbaseRdfConfiguration(conf); + long limit = 3l; + queryConf.setLimit(limit); + + CloseableIteration<RyaStatement, RyaDAOException> iter = queryEngine.query(new RyaStatement(cpu, loadPerc, null), queryConf); + int count = 0; + while (iter.hasNext()) { + iter.next().getObject(); + count++; + } + iter.close(); + assertEquals(limit, count); + } + + @Test + public void testTTL() throws Exception { + RyaURI cpu = new RyaURI(litdupsNS + "cpu"); + RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc"); + long current = System.currentTimeMillis(); + dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + "uri1"), null, null, null, null, current)); + dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + "uri2"), null, null, null, null, current - 1000l)); + dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + "uri3"), null, null, null, null, current - 2000l)); + dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + "uri4"), null, null, null, null, current - 3000l)); + dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + "uri5"), null, null, null, null, current - 4000l)); + + CloudbaseRyaQueryEngine queryEngine = dao.getQueryEngine(); + CloudbaseRdfConfiguration queryConf = new CloudbaseRdfConfiguration(conf); + queryConf.setTtl(3000l); + + CloseableIteration<RyaStatement, RyaDAOException> iter = queryEngine.query(new RyaStatement(cpu, loadPerc, null), queryConf); + int count = 0; + while (iter.hasNext()) { + iter.next().getObject(); + count++; + } + iter.close(); + assertEquals(3, count); + + queryConf.setStartTime(current - 3000l); + iter = queryEngine.query(new RyaStatement(cpu, loadPerc, null), queryConf); + count = 0; + while (iter.hasNext()) { + iter.next().getObject(); + count++; + } + iter.close(); + assertEquals(2, count); + } + + @Test + public void testPredRegex() throws Exception { + RyaURI cpu = new RyaURI(litdupsNS + "cpu"); + RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc"); + RyaURI loadPerc2 = new RyaURI(litdupsNS + "loadPerc2"); + dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + "uri1"))); + dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + "uri2"))); + dao.add(new RyaStatement(cpu, loadPerc2, new RyaURI(litdupsNS + "uri3"))); + dao.add(new RyaStatement(cpu, loadPerc2, new RyaURI(litdupsNS + "uri4"))); + dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + "uri5"))); + + CloudbaseRyaQueryEngine queryEngine = dao.getQueryEngine(); + CloudbaseRdfConfiguration queryConf = new CloudbaseRdfConfiguration(conf); + queryConf.setRegexPredicate(litdupsNS + "loadPerc[2]"); + + CloseableIteration<RyaStatement, RyaDAOException> iter = queryEngine.query(new RyaStatement(cpu, null, null), queryConf); + int count = 0; + while (iter.hasNext()) { + iter.next(); + count++; + } + iter.close(); + assertEquals(2, count); + } + + @Test + public void testSubjectRegex() throws Exception { + RyaURI cpu = new RyaURI(litdupsNS + "cpu"); + RyaURI cpu2 = new RyaURI(litdupsNS + "cpu2"); + RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc"); + RyaURI loadPerc2 = new RyaURI(litdupsNS + "loadPerc2"); + dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + "uri1"))); + dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + "uri2"))); + dao.add(new RyaStatement(cpu, loadPerc2, new RyaURI(litdupsNS + "uri3"))); + dao.add(new RyaStatement(cpu, loadPerc2, new RyaURI(litdupsNS + "uri4"))); + dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + "uri5"))); + dao.add(new RyaStatement(cpu2, loadPerc, new RyaURI(litdupsNS + "uri5"))); + + CloudbaseRyaQueryEngine queryEngine = dao.getQueryEngine(); + CloudbaseRdfConfiguration queryConf = new CloudbaseRdfConfiguration(conf); + queryConf.setRegexSubject(cpu.getData()); + + CloseableIteration<RyaStatement, RyaDAOException> iter = queryEngine.query(new RyaStatement(null, loadPerc, null), queryConf); + int count = 0; + while (iter.hasNext()) { + iter.next(); + count++; + } + iter.close(); + assertEquals(3, count); + } + + @Test + public void testFullTableScan() throws Exception { + RyaURI cpu = new RyaURI(litdupsNS + "cpu"); + RyaURI cpu2 = new RyaURI(litdupsNS + "cpu2"); + RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc"); + RyaURI loadPerc2 = new RyaURI(litdupsNS + "loadPerc2"); + dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + "uri1"))); + dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + "uri2"))); + dao.add(new RyaStatement(cpu, loadPerc2, new RyaURI(litdupsNS + "uri3"))); + dao.add(new RyaStatement(cpu, loadPerc2, new RyaURI(litdupsNS + "uri4"))); + dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + "uri5"))); + dao.add(new RyaStatement(cpu2, loadPerc, new RyaURI(litdupsNS + "uri5"))); + + CloudbaseRyaQueryEngine queryEngine = dao.getQueryEngine(); + CloudbaseRdfConfiguration queryConf = new CloudbaseRdfConfiguration(conf); + queryConf.setRegexSubject(cpu.getData()); + + CloseableIteration<RyaStatement, RyaDAOException> iter = queryEngine.query(new RyaStatement(null, null, null), queryConf); + int count = 0; + while (iter.hasNext()) { + iter.next(); + count++; + } + iter.close(); + assertEquals(7, count); //includes the rts:version + } + + @Test + public void testAddValue() throws Exception { + RyaURI cpu = new RyaURI(litdupsNS + "cpu"); + RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc"); + RyaURI uri1 = new RyaURI(litdupsNS + "uri1"); + String myval = "myval"; + dao.add(new RyaStatement(cpu, loadPerc, uri1, null, null, null, myval.getBytes())); + + CloudbaseRyaQueryEngine queryEngine = dao.getQueryEngine(); + CloseableIteration<RyaStatement, RyaDAOException> iter = queryEngine.query(new RyaStatement(cpu, loadPerc, null), conf); + assertTrue(iter.hasNext()); + assertEquals(myval, new String(iter.next().getValue())); + iter.close(); + } + + @Test + public void testDeleteDiffVisibility() throws Exception { + RyaURI cpu = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "cpu")); + RyaURI loadPerc = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "loadPerc")); + RyaURI uri1 = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "uri1")); + RyaStatement stmt1 = new RyaStatement(cpu, loadPerc, uri1, null, "1", "vis1".getBytes()); + dao.add(stmt1); + RyaStatement stmt2 = new RyaStatement(cpu, loadPerc, uri1, null, "2", "vis2".getBytes()); + dao.add(stmt2); + + CloudbaseRdfConfiguration cloneConf = conf.clone(); + cloneConf.setAuth("vis1,vis2"); + + CloseableIteration<RyaStatement, RyaDAOException> iter = dao.getQueryEngine().query(new RyaStatement(cpu, loadPerc, null), cloneConf); + int count = 0; + while (iter.hasNext()) { + iter.next(); + count++; + } + iter.close(); + assertEquals(2, count); + + dao.delete(stmt1, cloneConf); + + iter = dao.getQueryEngine().query(new RyaStatement(cpu, loadPerc, null), cloneConf); + count = 0; + while (iter.hasNext()) { + iter.next(); + count++; + } + iter.close(); + assertEquals(1, count); + } + + @Test + public void testAddCv() throws Exception { + RyaURI cpu = new RyaURI(litdupsNS + "cpu"); + RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc"); + RyaURI uri1 = new RyaURI(litdupsNS + "uri1"); + RyaURI uri2 = new RyaURI(litdupsNS + "uri2"); + RyaURI uri3 = new RyaURI(litdupsNS + "uri3"); + byte[] colVisABC = "A|B|C".getBytes(); + byte[] colVisAB = "A|B".getBytes(); + byte[] colVisA = "A".getBytes(); + dao.add(new RyaStatement(cpu, loadPerc, uri1, null, null, colVisABC)); + dao.add(new RyaStatement(cpu, loadPerc, uri2, null, null, colVisAB)); + dao.add(new RyaStatement(cpu, loadPerc, uri3, null, null, colVisA)); + + CloudbaseRyaQueryEngine queryEngine = dao.getQueryEngine(); + + //query with no auth + CloseableIteration<RyaStatement, RyaDAOException> iter = queryEngine.query(new RyaStatement(cpu, loadPerc, null), conf); + int count = 0; + while (iter.hasNext()) { + count++; + iter.next(); + } + assertEquals(0, count); + iter.close(); + + CloudbaseRdfConfiguration queryConf = new CloudbaseRdfConfiguration(); + queryConf.setAuth("B"); + iter = queryEngine.query(new RyaStatement(cpu, loadPerc, null), queryConf); + count = 0; + while (iter.hasNext()) { + iter.next(); + count++; + } + iter.close(); + assertEquals(2, count); + + queryConf.setAuth("A"); + iter = queryEngine.query(new RyaStatement(cpu, loadPerc, null), queryConf); + count = 0; + while (iter.hasNext()) { + iter.next(); + count++; + } + iter.close(); + assertEquals(3, count); + } + + @Test + public void testGetNamespace() throws Exception { + dao.addNamespace("ns", litdupsNS); + assertEquals(litdupsNS, dao.getNamespace("ns")); + dao.removeNamespace("ns"); + assertNull(dao.getNamespace("ns")); + } + + //TOOD: Add test for set of queries + @Test + public void testQuery() throws Exception { + RyaURI cpu = new RyaURI(litdupsNS + "cpu"); + RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc"); + RyaURI uri1 = new RyaURI(litdupsNS + "uri1"); + RyaURI uri2 = new RyaURI(litdupsNS + "uri2"); + RyaURI uri3 = new RyaURI(litdupsNS + "uri3"); + RyaURI uri4 = new RyaURI(litdupsNS + "uri4"); + RyaURI uri5 = new RyaURI(litdupsNS + "uri5"); + RyaURI uri6 = new RyaURI(litdupsNS + "uri6"); + dao.add(new RyaStatement(cpu, loadPerc, uri1)); + dao.add(new RyaStatement(cpu, loadPerc, uri2)); + dao.add(new RyaStatement(cpu, loadPerc, uri3)); + dao.add(new RyaStatement(cpu, loadPerc, uri4)); + dao.add(new RyaStatement(cpu, loadPerc, uri5)); + dao.add(new RyaStatement(cpu, loadPerc, uri6)); + + CloudbaseRyaQueryEngine queryEngine = dao.getQueryEngine(); + + Collection<Map.Entry<RyaStatement, BindingSet>> coll = new ArrayList(); + coll.add(new RdfCloudTripleStoreUtils.CustomEntry(new RyaStatement(null, loadPerc, uri1), null)); + coll.add(new RdfCloudTripleStoreUtils.CustomEntry(new RyaStatement(null, loadPerc, uri2), null)); + CloseableIteration<? extends Map.Entry<RyaStatement, BindingSet>, RyaDAOException> iter = queryEngine.queryWithBindingSet(coll, conf); + int count = 0; + while (iter.hasNext()) { + count++; + iter.next(); + } + iter.close(); + assertEquals(2, count); + + //now use batchscanner + CloudbaseRdfConfiguration queryConf = new CloudbaseRdfConfiguration(conf); + queryConf.setMaxRangesForScanner(2); + + coll = new ArrayList(); + coll.add(new RdfCloudTripleStoreUtils.CustomEntry(new RyaStatement(null, loadPerc, uri1), null)); + coll.add(new RdfCloudTripleStoreUtils.CustomEntry(new RyaStatement(null, loadPerc, uri2), null)); + coll.add(new RdfCloudTripleStoreUtils.CustomEntry(new RyaStatement(null, loadPerc, uri3), null)); + coll.add(new RdfCloudTripleStoreUtils.CustomEntry(new RyaStatement(null, loadPerc, uri4), null)); + iter = queryEngine.queryWithBindingSet(coll, queryConf); + assertTrue(iter.hasNext()); //old code had a weird behaviour that could not perform hasNext consecutively + assertTrue(iter.hasNext()); + assertTrue(iter.hasNext()); + count = 0; + while (iter.hasNext()) { + count++; + assertTrue(iter.hasNext()); + iter.next(); + } + iter.close(); + assertEquals(4, count); + } + + @Test + public void testQueryCollectionRegex() throws Exception { + RyaURI cpu = new RyaURI(litdupsNS + "cpu"); + RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc"); + RyaURI uri1 = new RyaURI(litdupsNS + "uri1"); + RyaURI uri2 = new RyaURI(litdupsNS + "uri2"); + RyaURI uri3 = new RyaURI(litdupsNS + "uri3"); + RyaURI uri4 = new RyaURI(litdupsNS + "uri4"); + RyaURI uri5 = new RyaURI(litdupsNS + "uri5"); + RyaURI uri6 = new RyaURI(litdupsNS + "uri6"); + dao.add(new RyaStatement(cpu, loadPerc, uri1)); + dao.add(new RyaStatement(cpu, loadPerc, uri2)); + dao.add(new RyaStatement(cpu, loadPerc, uri3)); + dao.add(new RyaStatement(cpu, loadPerc, uri4)); + dao.add(new RyaStatement(cpu, loadPerc, uri5)); + dao.add(new RyaStatement(cpu, loadPerc, uri6)); + + CloudbaseRyaQueryEngine queryEngine = dao.getQueryEngine(); + + Collection<Map.Entry<RyaStatement, BindingSet>> coll = new ArrayList(); + coll.add(new RdfCloudTripleStoreUtils.CustomEntry(new RyaStatement(null, loadPerc, uri1), null)); + coll.add(new RdfCloudTripleStoreUtils.CustomEntry(new RyaStatement(null, loadPerc, uri2), null)); + conf.setRegexPredicate(loadPerc.getData()); + CloseableIteration<? extends Map.Entry<RyaStatement, BindingSet>, RyaDAOException> iter = queryEngine.queryWithBindingSet(coll, conf); + int count = 0; + while (iter.hasNext()) { + count++; + iter.next(); + } + iter.close(); + assertEquals(2, count); + + conf.setRegexPredicate("notLoadPerc"); + iter = queryEngine.queryWithBindingSet(coll, conf); + count = 0; + while (iter.hasNext()) { + count++; + iter.next(); + } + iter.close(); + assertEquals(0, count); + } + + @Test + public void testQueryCollectionRegexWBatchScanner() throws Exception { + RyaURI cpu = new RyaURI(litdupsNS + "cpu"); + RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc"); + RyaURI uri1 = new RyaURI(litdupsNS + "uri1"); + RyaURI uri2 = new RyaURI(litdupsNS + "uri2"); + RyaURI uri3 = new RyaURI(litdupsNS + "uri3"); + RyaURI uri4 = new RyaURI(litdupsNS + "uri4"); + RyaURI uri5 = new RyaURI(litdupsNS + "uri5"); + RyaURI uri6 = new RyaURI(litdupsNS + "uri6"); + dao.add(new RyaStatement(cpu, loadPerc, uri1)); + dao.add(new RyaStatement(cpu, loadPerc, uri2)); + dao.add(new RyaStatement(cpu, loadPerc, uri3)); + dao.add(new RyaStatement(cpu, loadPerc, uri4)); + dao.add(new RyaStatement(cpu, loadPerc, uri5)); + dao.add(new RyaStatement(cpu, loadPerc, uri6)); + + CloudbaseRyaQueryEngine queryEngine = dao.getQueryEngine(); + CloudbaseRdfConfiguration queryConf = new CloudbaseRdfConfiguration(conf); + queryConf.setMaxRangesForScanner(1); + + Collection<Map.Entry<RyaStatement, BindingSet>> coll = new ArrayList(); + coll.add(new RdfCloudTripleStoreUtils.CustomEntry(new RyaStatement(null, loadPerc, uri1), null)); + coll.add(new RdfCloudTripleStoreUtils.CustomEntry(new RyaStatement(null, loadPerc, uri2), null)); + conf.setRegexPredicate(loadPerc.getData()); + CloseableIteration<? extends Map.Entry<RyaStatement, BindingSet>, RyaDAOException> iter = queryEngine.queryWithBindingSet(coll, queryConf); + int count = 0; + while (iter.hasNext()) { + count++; + iter.next(); + } + iter.close(); + assertEquals(2, count); + + queryConf.setRegexPredicate("notLoadPerc"); + iter = queryEngine.queryWithBindingSet(coll, queryConf); + count = 0; + while (iter.hasNext()) { + count++; + iter.next(); + } + iter.close(); + assertEquals(0, count); + } + + @Test + public void testLiteralTypes() throws Exception { + RyaURI cpu = new RyaURI(litdupsNS + "cpu"); + RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc"); + RyaType longLit = new RyaType(XMLSchema.LONG, "3"); + + dao.add(new RyaStatement(cpu, loadPerc, longLit)); + + CloudbaseRyaQueryEngine queryEngine = dao.getQueryEngine(); + + CloseableIteration<RyaStatement, RyaDAOException> query = queryEngine.query(new RyaStatement(cpu, null, null), conf); + assertTrue(query.hasNext()); + RyaStatement next = query.next(); + assertEquals(new Long(longLit.getData()), new Long(next.getObject().getData())); + query.close(); + + RyaType doubleLit = new RyaType(XMLSchema.DOUBLE, "2.0"); + + dao.add(new RyaStatement(cpu, loadPerc, doubleLit)); + + query = queryEngine.query(new RyaStatement(cpu, loadPerc, doubleLit), conf); + assertTrue(query.hasNext()); + next = query.next(); + assertEquals(Double.parseDouble(doubleLit.getData()), Double.parseDouble(next.getObject().getData()), 0.001); + query.close(); + } + + @Test + public void testSameLiteralStringTypes() throws Exception { + RyaURI cpu = new RyaURI(litdupsNS + "cpu"); + RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc"); + RyaType longLit = new RyaType(XMLSchema.LONG, "10"); + RyaType strLit = new RyaType(XMLSchema.STRING, new String(RyaContext.getInstance().serializeType(longLit)[0])); + + RyaStatement expected = new RyaStatement(cpu, loadPerc, longLit); + dao.add(expected); + dao.add(new RyaStatement(cpu, loadPerc, strLit)); + + CloudbaseRyaQueryEngine queryEngine = dao.getQueryEngine(); + + CloseableIteration<RyaStatement, RyaDAOException> query = queryEngine.query(new RyaStatement(cpu, loadPerc, longLit), conf); + assertTrue(query.hasNext()); + RyaStatement next = query.next(); + assertEquals(new Long(longLit.getData()), new Long(next.getObject().getData())); + assertEquals(longLit.getDataType(), next.getObject().getDataType()); + assertFalse(query.hasNext()); + query.close(); + } + + @Test + @Ignore("Purge does not work with the batch deleter in mock cloudbase being null") + public void testPurge() throws RyaDAOException, TableNotFoundException { + dao.add(newRyaStatement()); + assertFalse("table should not be empty", areTablesEmpty()); + + dao.purge(conf); + assertTrue("table should be empty", areTablesEmpty()); + //assertNotNull(dao.getVersion()); + } + + @Test + @Ignore("Purge does not work with the batch deleter in mock cloudbase being null") + public void testPurgeDoesNotBreakBatchWriters() throws TableNotFoundException, RyaDAOException { + dao.purge(conf); + assertTrue("table should be empty", areTablesEmpty()); + + dao.add(newRyaStatement()); + assertFalse("table should not be empty", areTablesEmpty()); + } + + @Test + public void testDropAndDestroy() throws RyaDAOException { + assertTrue(dao.isInitialized()); + dao.dropAndDestroy(); + for (String tableName : dao.getTables()) { + assertFalse(tableExists(tableName)); + } + assertFalse(dao.isInitialized()); + } + + private boolean areTablesEmpty() throws TableNotFoundException { + for (String table : dao.getTables()) { + if (tableExists(table)) { + // TODO: filter out version + if (createScanner(table).iterator().hasNext()) { + return false; + } + } + } + return true; + } + + private boolean tableExists(String tableName) { + return connector.tableOperations().exists(tableName); + } + + private Scanner createScanner(String tableName) throws TableNotFoundException { + return dao.getConnector().createScanner(tableName, conf.getAuthorizations()); + } + + private RyaStatement newRyaStatement() { + RyaURI subject = new RyaURI(litdupsNS + randomString()); + RyaURI predicate = new RyaURI(litdupsNS + randomString()); + RyaType object = new RyaType(randomString()); + + return new RyaStatement(subject, predicate, object); + } + + private String randomString() { + return UUID.randomUUID().toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/test/java/mvm/rya/cloudbase/DefineTripleQueryRangeFactoryTest.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/test/java/mvm/rya/cloudbase/DefineTripleQueryRangeFactoryTest.java b/dao/cloudbase.rya/src/test/java/mvm/rya/cloudbase/DefineTripleQueryRangeFactoryTest.java new file mode 100644 index 0000000..ddff532 --- /dev/null +++ b/dao/cloudbase.rya/src/test/java/mvm/rya/cloudbase/DefineTripleQueryRangeFactoryTest.java @@ -0,0 +1,242 @@ +//package mvm.rya.cloudbase; +// +//import cloudbase.core.data.Range; +//import junit.framework.TestCase; +//import mvm.rya.api.domain.RangeValue; +//import mvm.rya.cloudbase.query.DefineTripleQueryRangeFactory; +//import org.openrdf.model.URI; +//import org.openrdf.model.Value; +//import org.openrdf.model.ValueFactory; +//import org.openrdf.model.impl.ValueFactoryImpl; +// +//import java.util.Map; +// +//import static mvm.rya.api.RdfCloudTripleStoreConstants.*; +// +///** +// */ +//public class DefineTripleQueryRangeFactoryTest extends TestCase { +// +// public static final String DELIM_BYTES_STR = new String(DELIM_BYTES); +// public static final String URI_MARKER_STR = "\u0007"; +// public static final String RANGE_ENDKEY_SUFFIX = "\u0000"; +// DefineTripleQueryRangeFactory factory = new DefineTripleQueryRangeFactory(); +// ValueFactory vf = ValueFactoryImpl.getInstance(); +// static String litdupsNS = "urn:test:litdups#"; +// +// private CloudbaseRdfConfiguration conf = new CloudbaseRdfConfiguration(); +// +// public void testSPOCases() throws Exception { +// URI cpu = vf.createURI(litdupsNS, "cpu"); +// URI loadPerc = vf.createURI(litdupsNS, "loadPerc"); +// URI obj = vf.createURI(litdupsNS, "uri1"); +// +// //spo +// Map.Entry<TABLE_LAYOUT, Range> entry = +// factory.defineRange(cpu, loadPerc, obj, conf); +// assertEquals(TABLE_LAYOUT.SPO, entry.getKey()); +// String expected_start = URI_MARKER_STR + cpu.stringValue() + DELIM_BYTES_STR + +// URI_MARKER_STR + loadPerc.stringValue() + DELIM_BYTES_STR + +// URI_MARKER_STR + obj.stringValue(); +// assertEquals(expected_start, +// entry.getValue().getStartKey().getRow().toString()); +// assertEquals(expected_start + DELIM_STOP + RANGE_ENDKEY_SUFFIX, +// entry.getValue().getEndKey().getRow().toString()); +// +// +// //sp +// entry = factory.defineRange(cpu, loadPerc, null, conf); +// assertEquals(TABLE_LAYOUT.SPO, entry.getKey()); +// expected_start = URI_MARKER_STR + cpu.stringValue() + DELIM_BYTES_STR + +// URI_MARKER_STR + loadPerc.stringValue(); +// assertEquals(expected_start, +// entry.getValue().getStartKey().getRow().toString()); +// assertEquals(expected_start + DELIM_STOP + RANGE_ENDKEY_SUFFIX, +// entry.getValue().getEndKey().getRow().toString()); +// +// //s +// entry = factory.defineRange(cpu, null, null, conf); +// assertEquals(TABLE_LAYOUT.SPO, entry.getKey()); +// expected_start = URI_MARKER_STR + cpu.stringValue(); +// assertEquals(expected_start, +// entry.getValue().getStartKey().getRow().toString()); +// assertEquals(expected_start + DELIM_STOP + RANGE_ENDKEY_SUFFIX, +// entry.getValue().getEndKey().getRow().toString()); +// +// //all +// entry = factory.defineRange(null, null, null, conf); +// assertEquals(TABLE_LAYOUT.SPO, entry.getKey()); +// assertEquals("", +// entry.getValue().getStartKey().getRow().toString()); +// assertEquals(new String(new byte[]{Byte.MAX_VALUE}) + DELIM_STOP + RANGE_ENDKEY_SUFFIX, +// entry.getValue().getEndKey().getRow().toString()); +// } +// +// public void testSPOCasesWithRanges() throws Exception { +// URI subj_start = vf.createURI(litdupsNS, "subj_start"); +// URI subj_end = vf.createURI(litdupsNS, "subj_stop"); +// URI pred_start = vf.createURI(litdupsNS, "pred_start"); +// URI pred_end = vf.createURI(litdupsNS, "pred_stop"); +// URI obj_start = vf.createURI(litdupsNS, "obj_start"); +// URI obj_end = vf.createURI(litdupsNS, "obj_stop"); +// +// Value subj = new RangeValue(subj_start, subj_end); +// Value pred = new RangeValue(pred_start, pred_end); +// Value obj = new RangeValue(obj_start, obj_end); +// +// //spo - o has range +// Map.Entry<TABLE_LAYOUT, Range> entry = +// factory.defineRange(subj_start, pred_start, obj, conf); +// assertEquals(TABLE_LAYOUT.SPO, entry.getKey()); +// String expected_start = URI_MARKER_STR + subj_start.stringValue() + DELIM_BYTES_STR + +// URI_MARKER_STR + pred_start.stringValue() + DELIM_BYTES_STR + +// URI_MARKER_STR + obj_start.stringValue(); +// assertEquals(expected_start, +// entry.getValue().getStartKey().getRow().toString()); +// String expected_end = URI_MARKER_STR + subj_start.stringValue() + DELIM_BYTES_STR + +// URI_MARKER_STR + pred_start.stringValue() + DELIM_BYTES_STR + +// URI_MARKER_STR + obj_end.stringValue(); +// assertEquals(expected_end + DELIM_STOP + RANGE_ENDKEY_SUFFIX, +// entry.getValue().getEndKey().getRow().toString()); +// +// //sp - p has range +// entry = factory.defineRange(subj_start, pred, null, conf); +// assertEquals(TABLE_LAYOUT.SPO, entry.getKey()); +// expected_start = URI_MARKER_STR + subj_start.stringValue() + DELIM_BYTES_STR + +// URI_MARKER_STR + pred_start.stringValue(); +// assertEquals(expected_start, +// entry.getValue().getStartKey().getRow().toString()); +// expected_end = URI_MARKER_STR + subj_start.stringValue() + DELIM_BYTES_STR + +// URI_MARKER_STR + pred_end.stringValue(); +// assertEquals(expected_end + DELIM_STOP + RANGE_ENDKEY_SUFFIX, +// entry.getValue().getEndKey().getRow().toString()); +// +// //s - s has range +// entry = factory.defineRange(subj, null, null, conf); +// assertEquals(TABLE_LAYOUT.SPO, entry.getKey()); +// expected_start = URI_MARKER_STR + subj_start.stringValue(); +// assertEquals(expected_start, +// entry.getValue().getStartKey().getRow().toString()); +// expected_end = URI_MARKER_STR + subj_end.stringValue(); +// assertEquals(expected_end + DELIM_STOP + RANGE_ENDKEY_SUFFIX, +// entry.getValue().getEndKey().getRow().toString()); +// } +// +// public void testPOCases() throws Exception { +// URI loadPerc = vf.createURI(litdupsNS, "loadPerc"); +// URI obj = vf.createURI(litdupsNS, "uri1"); +// +// //po +// Map.Entry<TABLE_LAYOUT, Range> entry = +// factory.defineRange(null, loadPerc, obj, conf); +// assertEquals(TABLE_LAYOUT.PO, entry.getKey()); +// String expected_start = URI_MARKER_STR + loadPerc.stringValue() + DELIM_BYTES_STR + +// URI_MARKER_STR + obj.stringValue(); +// assertEquals(expected_start, +// entry.getValue().getStartKey().getRow().toString()); +// assertEquals(expected_start + DELIM_STOP + RANGE_ENDKEY_SUFFIX, +// entry.getValue().getEndKey().getRow().toString()); +// +// //p +// entry = factory.defineRange(null, loadPerc, null, conf); +// assertEquals(TABLE_LAYOUT.PO, entry.getKey()); +// expected_start = URI_MARKER_STR + loadPerc.stringValue(); +// assertEquals(expected_start, +// entry.getValue().getStartKey().getRow().toString()); +// assertEquals(expected_start + DELIM_STOP + RANGE_ENDKEY_SUFFIX, +// entry.getValue().getEndKey().getRow().toString()); +// } +// +// public void testPOCasesWithRanges() throws Exception { +// URI pred_start = vf.createURI(litdupsNS, "pred_start"); +// URI pred_end = vf.createURI(litdupsNS, "pred_stop"); +// URI obj_start = vf.createURI(litdupsNS, "obj_start"); +// URI obj_end = vf.createURI(litdupsNS, "obj_stop"); +// +// Value pred = new RangeValue(pred_start, pred_end); +// Value obj = new RangeValue(obj_start, obj_end); +// +// //po +// Map.Entry<TABLE_LAYOUT, Range> entry = +// factory.defineRange(null, pred_start, obj, conf); +// assertEquals(TABLE_LAYOUT.PO, entry.getKey()); +// String expected_start = URI_MARKER_STR + pred_start.stringValue() + DELIM_BYTES_STR + +// URI_MARKER_STR + obj_start.stringValue(); +// assertEquals(expected_start, +// entry.getValue().getStartKey().getRow().toString()); +// String expected_end = URI_MARKER_STR + pred_start.stringValue() + DELIM_BYTES_STR + +// URI_MARKER_STR + obj_end.stringValue(); +// assertEquals(expected_end + DELIM_STOP + RANGE_ENDKEY_SUFFIX, +// entry.getValue().getEndKey().getRow().toString()); +// +// //p +// entry = factory.defineRange(null, pred, null, conf); +// assertEquals(TABLE_LAYOUT.PO, entry.getKey()); +// expected_start = URI_MARKER_STR + pred_start.stringValue(); +// assertEquals(expected_start, +// entry.getValue().getStartKey().getRow().toString()); +// expected_end = URI_MARKER_STR + pred_end.stringValue(); +// assertEquals(expected_end + DELIM_STOP + RANGE_ENDKEY_SUFFIX, +// entry.getValue().getEndKey().getRow().toString()); +// } +// +// public void testOSPCases() throws Exception { +// URI cpu = vf.createURI(litdupsNS, "cpu"); +// URI obj = vf.createURI(litdupsNS, "uri1"); +// +// //so +// Map.Entry<TABLE_LAYOUT, Range> entry = +// factory.defineRange(cpu, null, obj, conf); +// assertEquals(TABLE_LAYOUT.OSP, entry.getKey()); +// String expected_start = URI_MARKER_STR + obj.stringValue() + DELIM_BYTES_STR + +// URI_MARKER_STR + cpu.stringValue(); +// assertEquals(expected_start, +// entry.getValue().getStartKey().getRow().toString()); +// assertEquals(expected_start + DELIM_STOP + RANGE_ENDKEY_SUFFIX, +// entry.getValue().getEndKey().getRow().toString()); +// +// //o +// entry = factory.defineRange(null, null, obj, conf); +// assertEquals(TABLE_LAYOUT.OSP, entry.getKey()); +// expected_start = URI_MARKER_STR + obj.stringValue(); +// assertEquals(expected_start, +// entry.getValue().getStartKey().getRow().toString()); +// assertEquals(expected_start + DELIM_STOP + RANGE_ENDKEY_SUFFIX, +// entry.getValue().getEndKey().getRow().toString()); +// } +// +// +// public void testOSPCasesWithRanges() throws Exception { +// URI subj_start = vf.createURI(litdupsNS, "subj_start"); +// URI subj_end = vf.createURI(litdupsNS, "subj_stop"); +// URI obj_start = vf.createURI(litdupsNS, "obj_start"); +// URI obj_end = vf.createURI(litdupsNS, "obj_stop"); +// +// Value subj = new RangeValue(subj_start, subj_end); +// Value obj = new RangeValue(obj_start, obj_end); +// +// //so - s should be the range +// Map.Entry<TABLE_LAYOUT, Range> entry = +// factory.defineRange(subj, null, obj_start, conf); +// assertEquals(TABLE_LAYOUT.OSP, entry.getKey()); +// String expected_start = URI_MARKER_STR + obj_start.stringValue() + DELIM_BYTES_STR + +// URI_MARKER_STR + subj_start.stringValue(); +// assertEquals(expected_start, +// entry.getValue().getStartKey().getRow().toString()); +// String expected_end = URI_MARKER_STR + obj_start.stringValue() + DELIM_BYTES_STR + +// URI_MARKER_STR + subj_end.stringValue(); +// assertEquals(expected_end + DELIM_STOP + RANGE_ENDKEY_SUFFIX, +// entry.getValue().getEndKey().getRow().toString()); +// +// //o - o is range +// entry = factory.defineRange(null, null, obj, conf); +// assertEquals(TABLE_LAYOUT.OSP, entry.getKey()); +// expected_start = URI_MARKER_STR + obj_start.stringValue(); +// assertEquals(expected_start, +// entry.getValue().getStartKey().getRow().toString()); +// expected_end = URI_MARKER_STR + obj_end.stringValue(); +// assertEquals(expected_end + DELIM_STOP + RANGE_ENDKEY_SUFFIX, +// entry.getValue().getEndKey().getRow().toString()); +// } +// +//} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/mongodb.rya/pom.xml ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/pom.xml b/dao/mongodb.rya/pom.xml new file mode 100644 index 0000000..30b9a4c --- /dev/null +++ b/dao/mongodb.rya/pom.xml @@ -0,0 +1,30 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <artifactId>mongodb.rya</artifactId> + <name>${project.groupId}.${project.artifactId}</name> + <properties> + + </properties> + <parent> + <groupId>mvm.rya</groupId> + <artifactId>rya.dao</artifactId> + <version>3.2.9</version> + </parent> + <dependencies> + <dependency> + <groupId>mvm.rya</groupId> + <artifactId>rya.api</artifactId> + </dependency> + <dependency> + <groupId>org.mongodb</groupId> + <artifactId>mongo-java-driver</artifactId> + <version>2.13.0-rc0</version> + </dependency> + <dependency> + <groupId>de.flapdoodle.embed</groupId> + <artifactId>de.flapdoodle.embed.mongo</artifactId> + <version>1.50.0</version> + </dependency> + </dependencies> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBQueryEngine.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBQueryEngine.java b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBQueryEngine.java new file mode 100644 index 0000000..c215184 --- /dev/null +++ b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBQueryEngine.java @@ -0,0 +1,187 @@ +package mvm.rya.mongodb; + +import info.aduna.iteration.CloseableIteration; + +import java.io.Closeable; +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.api.persist.query.BatchRyaQuery; +import mvm.rya.api.persist.query.RyaQuery; +import mvm.rya.api.persist.query.RyaQueryEngine; +import mvm.rya.mongodb.dao.MongoDBStorageStrategy; +import mvm.rya.mongodb.dao.SimpleMongoDBStorageStrategy; +import mvm.rya.mongodb.iter.NonCloseableRyaStatementCursorIterator; +import mvm.rya.mongodb.iter.RyaStatementBindingSetCursorIterator; +import mvm.rya.mongodb.iter.RyaStatementCursorIterable; +import mvm.rya.mongodb.iter.RyaStatementCursorIterator; + +import org.calrissian.mango.collect.CloseableIterable; +import org.openrdf.query.BindingSet; + +import com.mongodb.DB; +import com.mongodb.DBCollection; +import com.mongodb.DBObject; +import com.mongodb.MongoClient; + +/** + * Date: 7/17/12 + * Time: 9:28 AM + */ +public class MongoDBQueryEngine implements RyaQueryEngine<MongoDBRdfConfiguration>, Closeable { + + private MongoDBRdfConfiguration configuration; + private MongoClient mongoClient; + private DBCollection coll; + private MongoDBStorageStrategy strategy; + + public MongoDBQueryEngine(MongoDBRdfConfiguration conf) throws NumberFormatException, UnknownHostException{ + mongoClient = new MongoClient(conf.get(MongoDBRdfConfiguration.MONGO_INSTANCE), + Integer.valueOf(conf.get(MongoDBRdfConfiguration.MONGO_INSTANCE_PORT))); + DB db = mongoClient.getDB( conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME)); + coll = db.getCollection(conf.getTriplesCollectionName()); + this.strategy = new SimpleMongoDBStorageStrategy(); + } + + + @Override + public void setConf(MongoDBRdfConfiguration conf) { + configuration = conf; + } + + @Override + public MongoDBRdfConfiguration getConf() { + return configuration; + } + + @Override + public CloseableIteration<RyaStatement, RyaDAOException> query( + RyaStatement stmt, MongoDBRdfConfiguration conf) + throws RyaDAOException { + if (conf == null) { + conf = configuration; + } + Long maxResults = conf.getLimit(); + Set<DBObject> queries = new HashSet<DBObject>(); + DBObject query = strategy.getQuery(stmt); + queries.add(query); + RyaStatementCursorIterator iterator = new RyaStatementCursorIterator(coll, queries, strategy); + + if (maxResults != null) { + iterator.setMaxResults(maxResults); + } + return iterator; + } + @Override + public CloseableIteration<? extends Entry<RyaStatement, BindingSet>, RyaDAOException> queryWithBindingSet( + Collection<Entry<RyaStatement, BindingSet>> stmts, + MongoDBRdfConfiguration conf) throws RyaDAOException { + if (conf == null) { + conf = configuration; + } + Long maxResults = conf.getLimit(); + Map<DBObject, BindingSet> rangeMap = new HashMap<DBObject, BindingSet>(); + + //TODO: cannot span multiple tables here + try { + for (Map.Entry<RyaStatement, BindingSet> stmtbs : stmts) { + RyaStatement stmt = stmtbs.getKey(); + BindingSet bs = stmtbs.getValue(); + DBObject query = strategy.getQuery(stmt); + rangeMap.put(query, bs); + } + + // TODO not sure what to do about regex ranges? + RyaStatementBindingSetCursorIterator iterator = new RyaStatementBindingSetCursorIterator(coll, rangeMap, strategy); + + if (maxResults != null) { + iterator.setMaxResults(maxResults); + } + return iterator; + } catch (Exception e) { + throw new RyaDAOException(e); + } + + } + @Override + public CloseableIteration<RyaStatement, RyaDAOException> batchQuery( + Collection<RyaStatement> stmts, MongoDBRdfConfiguration conf) + throws RyaDAOException { + if (conf == null) { + conf = configuration; + } + Long maxResults = conf.getLimit(); + Set<DBObject> queries = new HashSet<DBObject>(); + + try { + for (RyaStatement stmt : stmts) { + queries.add( strategy.getQuery(stmt)); + } + + // TODO not sure what to do about regex ranges? + RyaStatementCursorIterator iterator = new RyaStatementCursorIterator(coll, queries, strategy); + + if (maxResults != null) { + iterator.setMaxResults(maxResults); + } + return iterator; + } catch (Exception e) { + throw new RyaDAOException(e); + } + + } + @Override + public CloseableIterable<RyaStatement> query(RyaQuery ryaQuery) + throws RyaDAOException { + Set<DBObject> queries = new HashSet<DBObject>(); + + try { + queries.add( strategy.getQuery(ryaQuery)); + + // TODO not sure what to do about regex ranges? + // TODO this is gross + RyaStatementCursorIterable iterator = new RyaStatementCursorIterable(new NonCloseableRyaStatementCursorIterator(new RyaStatementCursorIterator(coll, queries, strategy))); + + return iterator; + } catch (Exception e) { + throw new RyaDAOException(e); + } + } + @Override + public CloseableIterable<RyaStatement> query(BatchRyaQuery batchRyaQuery) + throws RyaDAOException { + try { + Set<DBObject> queries = new HashSet<DBObject>(); + for (RyaStatement statement : batchRyaQuery.getQueries()){ + queries.add( strategy.getQuery(statement)); + + } + + // TODO not sure what to do about regex ranges? + // TODO this is gross + RyaStatementCursorIterable iterator = new RyaStatementCursorIterable(new NonCloseableRyaStatementCursorIterator(new RyaStatementCursorIterator(coll, queries, strategy))); + + return iterator; + } catch (Exception e) { + throw new RyaDAOException(e); + } + } + + @Override + public void close() throws IOException { + if (mongoClient != null){ mongoClient.close(); } + } + + + + + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBRdfConfiguration.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBRdfConfiguration.java b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBRdfConfiguration.java new file mode 100644 index 0000000..0054847 --- /dev/null +++ b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBRdfConfiguration.java @@ -0,0 +1,101 @@ +package mvm.rya.mongodb; + + +import java.util.List; + +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.persist.index.RyaSecondaryIndexer; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Lists; + +public class MongoDBRdfConfiguration extends RdfCloudTripleStoreConfiguration { + public static final String MONGO_INSTANCE = "mongo.db.instance"; + public static final String MONGO_INSTANCE_PORT = "mongo.db.port"; + public static final String MONGO_GEO_MAXDISTANCE = "mongo.geo.maxdist"; + public static final String MONGO_DB_NAME = "mongo.db.name"; + public static final String MONGO_COLLECTION_PREFIX = "mongo.db.collectionprefix"; + public static final String MONGO_USER = "mongo.db.user"; + public static final String MONGO_USER_PASSWORD = "mongo.db.userpassword"; + public static final String USE_TEST_MONGO = "mongo.db.test"; + public static final String CONF_ADDITIONAL_INDEXERS = "ac.additional.indexers"; + + public MongoDBRdfConfiguration() { + super(); + } + + public MongoDBRdfConfiguration(Configuration other) { + super(other); + } + + @Override + public MongoDBRdfConfiguration clone() { + return new MongoDBRdfConfiguration(this); + } + + public boolean getUseTestMongo() { + return this.getBoolean(USE_TEST_MONGO, false); + } + + public void setUseTestMongo(boolean useTestMongo) { + this.setBoolean(USE_TEST_MONGO, useTestMongo); + } + + public String getTriplesCollectionName() { + return this.get(MONGO_COLLECTION_PREFIX, "rya") + "_triples"; + } + + public String getCollectionName() { + return this.get(MONGO_COLLECTION_PREFIX, "rya"); + } + + public void setCollectionName(String name) { + this.set(MONGO_COLLECTION_PREFIX, name); + } + + public String getMongoInstance() { + return this.get(MONGO_INSTANCE, "localhost"); + } + + public void setMongoInstance(String name) { + this.set(MONGO_INSTANCE, name); + } + + public String getMongoPort() { + return this.get(MONGO_INSTANCE_PORT, "27017"); + } + + public void setMongoPort(String name) { + this.set(MONGO_INSTANCE_PORT, name); + } + + public String getMongoDBName() { + return this.get(MONGO_DB_NAME, "rya"); + } + + public void setMongoDBName(String name) { + this.set(MONGO_DB_NAME, name); + } + + public String getNameSpacesCollectionName() { + return this.get(MONGO_COLLECTION_PREFIX, "rya") + "_ns"; + } + + public void setAdditionalIndexers(Class<? extends RyaSecondaryIndexer>... indexers) { + List<String> strs = Lists.newArrayList(); + for (Class ai : indexers){ + strs.add(ai.getName()); + } + + setStrings(CONF_ADDITIONAL_INDEXERS, strs.toArray(new String[]{})); + } + + public List<RyaSecondaryIndexer> getAdditionalIndexers() { + return getInstances(CONF_ADDITIONAL_INDEXERS, RyaSecondaryIndexer.class); + } + + + + +}
