Repository: incubator-rya Updated Branches: refs/heads/master b03b18938 -> b42191bd8
RYA-261-ContextValidation; closes #146, closes #96 Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/b42191bd Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/b42191bd Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/b42191bd Branch: refs/heads/master Commit: b42191bd8596f3774988a5b07152e98fed63cafd Parents: b03b189 Author: Caleb Meier <[email protected]> Authored: Wed Mar 15 09:40:28 2017 -0700 Committer: pujav65 <[email protected]> Committed: Tue Apr 4 09:08:37 2017 -0400 ---------------------------------------------------------------------- .../accumulo/query/AccumuloRyaQueryEngine.java | 160 +++++---- .../accumulo/query/RangeBindingSetEntries.java | 66 ++-- .../RdfCloudTripleStoreConnection.java | 8 +- .../ParallelEvaluationStrategyImpl.java | 13 +- .../evaluation/StatementPatternEvalTest.java | 327 +++++++++++++++++++ 5 files changed, 488 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b42191bd/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/AccumuloRyaQueryEngine.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/AccumuloRyaQueryEngine.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/AccumuloRyaQueryEngine.java index b9e53cf..888d896 100644 --- a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/AccumuloRyaQueryEngine.java +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/AccumuloRyaQueryEngine.java @@ -19,8 +19,6 @@ package org.apache.rya.accumulo.query; * under the License. */ - - import static org.apache.rya.api.RdfCloudTripleStoreUtils.layoutToTable; import info.aduna.iteration.CloseableIteration; @@ -57,6 +55,7 @@ import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.ScannerBase; +import org.apache.accumulo.core.data.Column; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; @@ -75,8 +74,7 @@ import com.google.common.collect.FluentIterable; import com.google.common.collect.Iterators; /** - * Date: 7/17/12 - * Time: 9:28 AM + * Date: 7/17/12 Time: 9:28 AM */ public class AccumuloRyaQueryEngine implements RyaQueryEngine<AccumuloRdfConfiguration> { @@ -99,7 +97,8 @@ public class AccumuloRyaQueryEngine implements RyaQueryEngine<AccumuloRdfConfigu } @Override - public CloseableIteration<RyaStatement, RyaDAOException> query(RyaStatement stmt, AccumuloRdfConfiguration conf) throws RyaDAOException { + public CloseableIteration<RyaStatement, RyaDAOException> query(RyaStatement stmt, AccumuloRdfConfiguration conf) + throws RyaDAOException { if (conf == null) { conf = configuration; } @@ -115,32 +114,39 @@ public class AccumuloRyaQueryEngine implements RyaQueryEngine<AccumuloRdfConfigu } @Override - public CloseableIteration<? extends Map.Entry<RyaStatement, BindingSet>, RyaDAOException> queryWithBindingSet(Collection<Map.Entry<RyaStatement, BindingSet>> stmts, AccumuloRdfConfiguration conf) throws RyaDAOException { + public CloseableIteration<? extends Map.Entry<RyaStatement, BindingSet>, RyaDAOException> queryWithBindingSet( + Collection<Map.Entry<RyaStatement, BindingSet>> stmts, AccumuloRdfConfiguration conf) throws RyaDAOException { if (conf == null) { conf = configuration; } - //query configuration + // query configuration Authorizations authorizations = conf.getAuthorizations(); Long ttl = conf.getTtl(); Long maxResults = conf.getLimit(); Integer maxRanges = conf.getMaxRangesForScanner(); Integer numThreads = conf.getNumThreads(); - //TODO: cannot span multiple tables here + // TODO: cannot span multiple tables here try { Collection<Range> ranges = new HashSet<Range>(); RangeBindingSetEntries rangeMap = new RangeBindingSetEntries(); TABLE_LAYOUT layout = null; RyaURI context = null; TriplePatternStrategy strategy = null; - boolean contextSet = false; + RyaURI columnFamily = null; + boolean columnFamilySet = false; for (Map.Entry<RyaStatement, BindingSet> stmtbs : stmts) { RyaStatement stmt = stmtbs.getKey(); - if(!contextSet) { - context = stmt.getContext(); - contextSet = true; - } else if(context != null && !context.equals(stmt.getContext())) { - context = null; + context = stmt.getContext(); + // if all RyaStatements for this query have the same context, + // then set the columnFamily to be that value so that Scanner can fetch + // only that ColumnFamily. Otherwise set columnFamily to null so that + // Scanner will fetch all ColumnFamilies. + if (!columnFamilySet) { + columnFamily = context; + columnFamilySet = true; + } else if (columnFamily != null && !columnFamily.equals(context)) { + columnFamily = null; } BindingSet bs = stmtbs.getValue(); strategy = ryaContext.retrieveStrategy(stmt); @@ -148,19 +154,50 @@ public class AccumuloRyaQueryEngine implements RyaQueryEngine<AccumuloRdfConfigu throw new IllegalArgumentException("TriplePattern[" + stmt + "] not supported"); } - Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, ByteRange> entry = - strategy.defineRange(stmt.getSubject(), stmt.getPredicate(), stmt.getObject(), stmt.getContext(), conf); + Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, ByteRange> entry = strategy.defineRange(stmt.getSubject(), + stmt.getPredicate(), stmt.getObject(), stmt.getContext(), conf); - //use range to set scanner - //populate scanner based on authorizations, ttl + // use range to set scanner + // populate scanner based on authorizations, ttl layout = entry.getKey(); ByteRange byteRange = entry.getValue(); Range range = new Range(new Text(byteRange.getStart()), new Text(byteRange.getEnd())); + Range rangeMapRange = range; + // if context != null, bind context info to Range so that + // ColumnFamily Keys returned by Scanner + // can be compared to ColumnFamily of start and stop Keys of + // Range -- important when querying for named + // graphs by requiring that Statements have same context Value + // as the Value specified in the BindingSet + if (context != null) { + byte[] contextBytes = context.getData().getBytes("UTF-8"); + rangeMapRange = range.bound(new Column(contextBytes, new byte[] { (byte) 0x00 }, new byte[] { (byte) 0x00 }), + new Column(contextBytes, new byte[] { (byte) 0xff }, new byte[] { (byte) 0xff })); + } + // ranges gets a Range that has no Column bounds, but + // rangeMap gets a Range that does have Column bounds + // If we inserted multiple Ranges with the same Row (but + // distinct Column bounds) into the Set ranges, we would get + // duplicate + // results when the Row is not exact. So RyaStatements that + // differ only in their context are all mapped to the same + // Range (with no Column bounds) for scanning purposes. + // However, context information is included in a Column that + // bounds the Range inserted into rangeMap. This is because + // in the class {@link RyaStatementBindingSetKeyValueIterator}, + // the rangeMap is + // used to join the scan results with the BindingSets to produce + // the query results. The additional ColumnFamily info is + // required in this join + // process to allow for the Statement contexts to be compared + // with the BindingSet contexts + // See {@link RangeBindingSetEntries#containsKey}. ranges.add(range); - rangeMap.ranges.add(new RdfCloudTripleStoreUtils.CustomEntry<Range, BindingSet>(range, bs)); + rangeMap.put(rangeMapRange, bs); } - //no ranges - if (layout == null) return null; + // no ranges + if (layout == null) + return null; String regexSubject = conf.getRegexSubject(); String regexPredicate = conf.getRegexPredicate(); String regexObject = conf.getRegexObject(); @@ -172,7 +209,7 @@ public class AccumuloRyaQueryEngine implements RyaQueryEngine<AccumuloRdfConfigu if (useBatchScanner) { ScannerBase scanner = connector.createBatchScanner(table, authorizations, numThreads); ((BatchScanner) scanner).setRanges(ranges); - fillScanner(scanner, context, null, ttl, null, tripleRowRegex, conf); + fillScanner(scanner, columnFamily, null, ttl, null, tripleRowRegex, conf); iterator = new RyaStatementBindingSetKeyValueIterator(layout, ryaContext, scanner, rangeMap); } else { Scanner scannerBase = null; @@ -181,7 +218,7 @@ public class AccumuloRyaQueryEngine implements RyaQueryEngine<AccumuloRdfConfigu for (Range range : ranges) { scannerBase = connector.createScanner(table, authorizations); scannerBase.setRange(range); - fillScanner(scannerBase, context, null, ttl, null, tripleRowRegex, conf); + fillScanner(scannerBase, columnFamily, null, ttl, null, tripleRowRegex, conf); iters[i] = scannerBase.iterator(); i++; } @@ -216,7 +253,7 @@ public class AccumuloRyaQueryEngine implements RyaQueryEngine<AccumuloRdfConfigu RyaStatement stmt = ryaQuery.getQuery(); Preconditions.checkNotNull(stmt); - //query configuration + // query configuration String[] auths = ryaQuery.getAuths(); Authorizations authorizations = auths != null ? new Authorizations(auths) : configuration.getAuthorizations(); Long ttl = ryaQuery.getTtl(); @@ -229,7 +266,7 @@ public class AccumuloRyaQueryEngine implements RyaQueryEngine<AccumuloRdfConfigu TableLayoutStrategy tableLayoutStrategy = configuration.getTableLayoutStrategy(); try { - //find triple pattern range + // find triple pattern range TriplePatternStrategy strategy = ryaContext.retrieveStrategy(stmt); TABLE_LAYOUT layout; Range range; @@ -240,9 +277,9 @@ public class AccumuloRyaQueryEngine implements RyaQueryEngine<AccumuloRdfConfigu String qualifier = stmt.getQualifer(); TripleRowRegex tripleRowRegex = null; if (strategy != null) { - //otherwise, full table scan is supported - Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, ByteRange> entry = - strategy.defineRange(subject, predicate, object, context, null); + // otherwise, full table scan is supported + Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, ByteRange> entry = strategy.defineRange(subject, predicate, object, + context, null); layout = entry.getKey(); ByteRange byteRange = entry.getValue(); range = new Range(new Text(byteRange.getStart()), new Text(byteRange.getEnd())); @@ -255,7 +292,7 @@ public class AccumuloRyaQueryEngine implements RyaQueryEngine<AccumuloRdfConfigu byte[] objectTypeInfo = null; if (object != null) { - //TODO: Not good to serialize this twice + // TODO: Not good to serialize this twice if (object instanceof RyaRange) { objectTypeInfo = RyaContext.getInstance().serializeType(((RyaRange) object).getStart())[1]; } else { @@ -265,8 +302,8 @@ public class AccumuloRyaQueryEngine implements RyaQueryEngine<AccumuloRdfConfigu tripleRowRegex = strategy.buildRegex(regexSubject, regexPredicate, regexObject, null, objectTypeInfo); - //use range to set scanner - //populate scanner based on authorizations, ttl + // use range to set scanner + // populate scanner based on authorizations, ttl String table = layoutToTable(layout, tableLayoutStrategy); Scanner scanner = connector.createScanner(table, authorizations); scanner.setRange(range); @@ -293,7 +330,7 @@ public class AccumuloRyaQueryEngine implements RyaQueryEngine<AccumuloRdfConfigu Iterable<RyaStatement> stmts = ryaQuery.getQueries(); Preconditions.checkNotNull(stmts); - //query configuration + // query configuration String[] auths = ryaQuery.getAuths(); final Authorizations authorizations = auths != null ? new Authorizations(auths) : configuration.getAuthorizations(); final Long ttl = ryaQuery.getTtl(); @@ -307,31 +344,32 @@ public class AccumuloRyaQueryEngine implements RyaQueryEngine<AccumuloRdfConfigu TableLayoutStrategy tableLayoutStrategy = configuration.getTableLayoutStrategy(); int maxRanges = ryaQuery.getMaxRanges(); - //TODO: cannot span multiple tables here + // TODO: cannot span multiple tables here try { Collection<Range> ranges = new HashSet<Range>(); TABLE_LAYOUT layout = null; RyaURI context = null; TriplePatternStrategy strategy = null; for (RyaStatement stmt : stmts) { - context = stmt.getContext(); //TODO: This will be overwritten + context = stmt.getContext(); // TODO: This will be overwritten strategy = ryaContext.retrieveStrategy(stmt); if (strategy == null) { throw new IllegalArgumentException("TriplePattern[" + stmt + "] not supported"); } - Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, ByteRange> entry = - strategy.defineRange(stmt.getSubject(), stmt.getPredicate(), stmt.getObject(), stmt.getContext(), null); + Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, ByteRange> entry = strategy.defineRange(stmt.getSubject(), + stmt.getPredicate(), stmt.getObject(), stmt.getContext(), null); - //use range to set scanner - //populate scanner based on authorizations, ttl + // use range to set scanner + // populate scanner based on authorizations, ttl layout = entry.getKey(); ByteRange byteRange = entry.getValue(); Range range = new Range(new Text(byteRange.getStart()), new Text(byteRange.getEnd())); ranges.add(range); } - //no ranges - if (layout == null) throw new IllegalArgumentException("No table layout specified"); + // no ranges + if (layout == null) + throw new IllegalArgumentException("No table layout specified"); final TripleRowRegex tripleRowRegex = strategy.buildRegex(regexSubject, regexPredicate, regexObject, null, null); @@ -342,23 +380,25 @@ public class AccumuloRyaQueryEngine implements RyaQueryEngine<AccumuloRdfConfigu BatchScanner scanner = connector.createBatchScanner(table, authorizations, numQueryThreads); scanner.setRanges(ranges); fillScanner(scanner, context, null, ttl, null, tripleRowRegex, ryaQuery.getConf()); - results = FluentCloseableIterable.from(new ScannerBaseCloseableIterable(scanner)).transform(keyValueToRyaStatementFunctionMap.get(layout)); + results = FluentCloseableIterable.from(new ScannerBaseCloseableIterable(scanner)) + .transform(keyValueToRyaStatementFunctionMap.get(layout)); } else { final RyaURI fcontext = context; final RdfCloudTripleStoreConfiguration fconf = ryaQuery.getConf(); - FluentIterable<RyaStatement> fluent = FluentIterable.from(ranges).transformAndConcat(new Function<Range, Iterable<Map.Entry<Key, Value>>>() { - @Override - public Iterable<Map.Entry<Key, Value>> apply(Range range) { - try { - Scanner scanner = connector.createScanner(table, authorizations); - scanner.setRange(range); - fillScanner(scanner, fcontext, null, ttl, null, tripleRowRegex, fconf); - return scanner; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }).transform(keyValueToRyaStatementFunctionMap.get(layout)); + FluentIterable<RyaStatement> fluent = FluentIterable.from(ranges) + .transformAndConcat(new Function<Range, Iterable<Map.Entry<Key, Value>>>() { + @Override + public Iterable<Map.Entry<Key, Value>> apply(Range range) { + try { + Scanner scanner = connector.createScanner(table, authorizations); + scanner.setRange(range); + fillScanner(scanner, fcontext, null, ttl, null, tripleRowRegex, fconf); + return scanner; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }).transform(keyValueToRyaStatementFunctionMap.get(layout)); results = FluentCloseableIterable.from(CloseableIterables.wrap(fluent)); } @@ -371,7 +411,8 @@ public class AccumuloRyaQueryEngine implements RyaQueryEngine<AccumuloRdfConfigu } } - protected void fillScanner(ScannerBase scanner, RyaURI context, String qualifier, Long ttl, Long currentTime, TripleRowRegex tripleRowRegex, RdfCloudTripleStoreConfiguration conf) throws IOException { + protected void fillScanner(ScannerBase scanner, RyaURI context, String qualifier, Long ttl, Long currentTime, + TripleRowRegex tripleRowRegex, RdfCloudTripleStoreConfiguration conf) throws IOException { if (context != null && qualifier != null) { scanner.fetchColumn(new Text(context.getData()), new Text(qualifier)); } else if (context != null) { @@ -383,8 +424,8 @@ public class AccumuloRyaQueryEngine implements RyaQueryEngine<AccumuloRdfConfigu } if (ttl != null) { IteratorSetting setting = new IteratorSetting(9, "fi", TimestampFilter.class.getName()); - TimestampFilter.setStart(setting, System.currentTimeMillis() - ttl, true); - if(currentTime != null){ + TimestampFilter.setStart(setting, System.currentTimeMillis() - ttl, true); + if (currentTime != null) { TimestampFilter.setStart(setting, currentTime - ttl, true); TimestampFilter.setEnd(setting, currentTime, true); } @@ -397,8 +438,9 @@ public class AccumuloRyaQueryEngine implements RyaQueryEngine<AccumuloRdfConfigu scanner.addScanIterator(setting); } if (conf instanceof AccumuloRdfConfiguration) { - //TODO should we take the iterator settings as is or should we adjust the priority based on the above? - for (IteratorSetting itr : ((AccumuloRdfConfiguration)conf).getAdditionalIterators()) { + // TODO should we take the iterator settings as is or should we + // adjust the priority based on the above? + for (IteratorSetting itr : ((AccumuloRdfConfiguration) conf).getAdditionalIterators()) { scanner.addScanIterator(itr); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b42191bd/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/RangeBindingSetEntries.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/RangeBindingSetEntries.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/RangeBindingSetEntries.java index 35ef776..4887ba0 100644 --- a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/RangeBindingSetEntries.java +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/RangeBindingSetEntries.java @@ -1,5 +1,9 @@ package org.apache.rya.accumulo.query; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -19,40 +23,64 @@ package org.apache.rya.accumulo.query; * under the License. */ - - import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparator; import org.openrdf.query.BindingSet; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Map; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; /** - * Class RangeBindingSetCollection - * Date: Feb 23, 2011 - * Time: 10:15:48 AM + * Class RangeBindingSetCollection Date: Feb 23, 2011 Time: 10:15:48 AM */ public class RangeBindingSetEntries { - public Collection<Map.Entry<Range, BindingSet>> ranges; + private Multimap<Range, BindingSet> ranges = HashMultimap.create(); public RangeBindingSetEntries() { - this(new ArrayList<Map.Entry<Range, BindingSet>>()); + ranges = HashMultimap.create(); } - public RangeBindingSetEntries(Collection<Map.Entry<Range, BindingSet>> ranges) { - this.ranges = ranges; + public void put(Range range, BindingSet bs) { + ranges.put(range, bs); } public Collection<BindingSet> containsKey(Key key) { - //TODO: need to find a better way to sort these and pull - //TODO: maybe fork/join here - Collection<BindingSet> bss = new ArrayList<BindingSet>(); - for (Map.Entry<Range, BindingSet> entry : ranges) { - if (entry.getKey().contains(key)) - bss.add(entry.getValue()); + Set<BindingSet> bsSet = new HashSet<>(); + for (Range range : ranges.keySet()) { + // Check to see if the Key falls within Range and has same ColumnFamily + // as beginning and ending key of Range. + // The additional ColumnFamily check by the method + // validateContext(...) is necessary because range.contains(key) + // returns true if only the Row is within the Range but the ColumnFamily + // doesn't fall within the Range ColumnFamily bounds. + if (range.contains(key) && validateContext(key.getColumnFamily(), range.getStartKey().getColumnFamily(), + range.getEndKey().getColumnFamily())) { + bsSet.addAll(ranges.get(range)); + } + } + return bsSet; + } + + /** + * + * @param colFamily + * @param startColFamily + * @param stopColFamily + * @return true if colFamily lies between startColFamily and stopColFamily + */ + private boolean validateContext(Text colFamily, Text startColFamily, Text stopColFamily) { + byte[] cfBytes = colFamily.getBytes(); + byte[] start = startColFamily.getBytes(); + byte[] stop = stopColFamily.getBytes(); + // range has empty column family, so all Keys falling with Range Row + // constraints should match + if (start.length == 0 && stop.length == 0) { + return true; } - return bss; + int result1 = WritableComparator.compareBytes(cfBytes, 0, cfBytes.length, start, 0, start.length); + int result2 = WritableComparator.compareBytes(cfBytes, 0, cfBytes.length, stop, 0, stop.length); + return result1 >= 0 && result2 <= 0; } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b42191bd/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java ---------------------------------------------------------------------- diff --git a/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java b/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java index 85440fd..ea8db77 100644 --- a/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java +++ b/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java @@ -296,7 +296,7 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase { } final ParallelEvaluationStrategyImpl strategy = new ParallelEvaluationStrategyImpl( - new StoreTripleSource(queryConf), inferenceEngine, dataset, queryConf); + new StoreTripleSource(queryConf, ryaDAO), inferenceEngine, dataset, queryConf); (new BindingAssigner()).optimize(tupleExpr, dataset, bindings); (new ConstantOptimizer(strategy)).optimize(tupleExpr, dataset, @@ -591,12 +591,14 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase { //TODO: ? } - public class StoreTripleSource implements TripleSource { + public static class StoreTripleSource implements TripleSource { private RdfCloudTripleStoreConfiguration conf; + private RyaDAO<?> ryaDAO; - public StoreTripleSource(RdfCloudTripleStoreConfiguration conf) { + public StoreTripleSource(RdfCloudTripleStoreConfiguration conf, RyaDAO<?> ryaDAO) { this.conf = conf; + this.ryaDAO = ryaDAO; } public CloseableIteration<Statement, QueryEvaluationException> getStatements( http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b42191bd/sail/src/main/java/org/apache/rya/rdftriplestore/evaluation/ParallelEvaluationStrategyImpl.java ---------------------------------------------------------------------- diff --git a/sail/src/main/java/org/apache/rya/rdftriplestore/evaluation/ParallelEvaluationStrategyImpl.java b/sail/src/main/java/org/apache/rya/rdftriplestore/evaluation/ParallelEvaluationStrategyImpl.java index c1eb68c..a3b70b6 100644 --- a/sail/src/main/java/org/apache/rya/rdftriplestore/evaluation/ParallelEvaluationStrategyImpl.java +++ b/sail/src/main/java/org/apache/rya/rdftriplestore/evaluation/ParallelEvaluationStrategyImpl.java @@ -42,6 +42,7 @@ import org.apache.rya.api.RdfCloudTripleStoreConfiguration; import org.apache.rya.api.RdfCloudTripleStoreUtils; import org.apache.rya.api.utils.NullableStatementImpl; import org.apache.rya.rdftriplestore.RdfCloudTripleStoreConnection; +import org.apache.rya.rdftriplestore.RdfCloudTripleStoreConnection.StoreTripleSource; import org.apache.rya.rdftriplestore.inference.InferenceEngine; import org.apache.rya.rdftriplestore.inference.InferenceEngineException; import org.apache.rya.rdftriplestore.utils.FixedStatementPattern; @@ -83,7 +84,7 @@ public class ParallelEvaluationStrategyImpl extends EvaluationStrategyImpl { private ExecutorService executorService; private InferenceEngine inferenceEngine; - public ParallelEvaluationStrategyImpl(RdfCloudTripleStoreConnection.StoreTripleSource tripleSource, InferenceEngine inferenceEngine, + public ParallelEvaluationStrategyImpl(StoreTripleSource tripleSource, InferenceEngine inferenceEngine, Dataset dataset, RdfCloudTripleStoreConfiguration conf) { super(tripleSource, dataset); Integer nthreads = conf.getNumThreads(); @@ -221,16 +222,18 @@ public class ParallelEvaluationStrategyImpl extends EvaluationStrategyImpl { Statement st = stbs.getKey(); BindingSet bs = stbs.getValue(); QueryBindingSet result = new QueryBindingSet(bs); - if (subjVar != null && !result.hasBinding(subjVar.getName())) { + //only add values to result BindingSet if Var is not constant and BindingSet doesn't already + //contain a Value for that Var name + if (subjVar != null && !subjVar.isConstant() && !result.hasBinding(subjVar.getName())) { result.addBinding(subjVar.getName(), st.getSubject()); } - if (predVar != null && !result.hasBinding(predVar.getName())) { + if (predVar != null && !predVar.isConstant() && !result.hasBinding(predVar.getName())) { result.addBinding(predVar.getName(), st.getPredicate()); } - if (objVar != null && !result.hasBinding(objVar.getName())) { + if (objVar != null && !objVar.isConstant() && !result.hasBinding(objVar.getName())) { result.addBinding(objVar.getName(), st.getObject()); } - if (cntxtVar != null && !result.hasBinding(cntxtVar.getName()) && st.getContext() != null) { + if (cntxtVar != null && !cntxtVar.isConstant() && !result.hasBinding(cntxtVar.getName()) && st.getContext() != null) { result.addBinding(cntxtVar.getName(), st.getContext()); } return result; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b42191bd/sail/src/test/java/org/apache/rya/rdftriplestore/evaluation/StatementPatternEvalTest.java ---------------------------------------------------------------------- diff --git a/sail/src/test/java/org/apache/rya/rdftriplestore/evaluation/StatementPatternEvalTest.java b/sail/src/test/java/org/apache/rya/rdftriplestore/evaluation/StatementPatternEvalTest.java new file mode 100644 index 0000000..c1e7d10 --- /dev/null +++ b/sail/src/test/java/org/apache/rya/rdftriplestore/evaluation/StatementPatternEvalTest.java @@ -0,0 +1,327 @@ +package org.apache.rya.rdftriplestore.evaluation; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.accumulo.AccumuloRyaDAO; +import org.apache.rya.api.RdfCloudTripleStoreConfiguration; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.api.domain.StatementMetadata; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.rdftriplestore.RdfCloudTripleStoreConnection.StoreTripleSource; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; +import org.openrdf.query.algebra.helpers.StatementPatternCollector; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import info.aduna.iteration.CloseableIteration; + +public class StatementPatternEvalTest { + + private AccumuloRyaDAO dao; + private AccumuloRdfConfiguration conf; + private ParallelEvaluationStrategyImpl eval; + + @Before + public void init() throws Exception { + conf = getConf(); + Instance mock = new MockInstance("instance"); + Connector conn = mock.getConnector("root", new PasswordToken("")); + dao = new AccumuloRyaDAO(); + dao.setConnector(conn); + dao.init(); + eval = new ParallelEvaluationStrategyImpl(new StoreTripleSource(conf, dao), null, null, conf); + } + + @After + public void close() throws RyaDAOException { + eval.shutdown(); + dao.destroy(); + } + + @Test + public void simpleQueryWithoutBindingSets() + throws MalformedQueryException, QueryEvaluationException, RyaDAOException { + //query is used to build statement that will be evaluated + String query = "select ?x ?c where{ graph ?c {?x <uri:talksTo> <uri:Bob>. }}"; + SPARQLParser parser = new SPARQLParser(); + ParsedQuery pq = parser.parseQuery(query, null); + List<StatementPattern> spList = StatementPatternCollector.process(pq.getTupleExpr()); + + RyaStatement statement1 = new RyaStatement(new RyaURI("uri:Joe"), new RyaURI("uri:talksTo"), + new RyaType("uri:Bob"), new RyaURI("uri:context1"), "", new StatementMetadata()); + dao.add(statement1); + + RyaStatement statement2 = new RyaStatement(new RyaURI("uri:Doug"), new RyaURI("uri:talksTo"), + new RyaType("uri:Bob"), new RyaURI("uri:context2"), "", new StatementMetadata()); + dao.add(statement2); + + RyaStatement statement3 = new RyaStatement(new RyaURI("uri:Eric"), new RyaURI("uri:talksTo"), + new RyaType("uri:Bob"), new RyaURI("uri:context3"), "", new StatementMetadata()); + dao.add(statement3); + + QueryBindingSet bsConstraint1 = new QueryBindingSet(); + + CloseableIteration<BindingSet, QueryEvaluationException> iteration = eval.evaluate(spList.get(0), Arrays.asList(bsConstraint1)); + + List<BindingSet> bsList = new ArrayList<>(); + while (iteration.hasNext()) { + bsList.add(iteration.next()); + } + + Assert.assertEquals(3, bsList.size()); + + QueryBindingSet expected1 = new QueryBindingSet(); + expected1.addBinding("x", new URIImpl("uri:Joe")); + expected1.addBinding("c", new URIImpl("uri:context1")); + + QueryBindingSet expected2 = new QueryBindingSet(); + expected2.addBinding("x", new URIImpl("uri:Doug")); + expected2.addBinding("c", new URIImpl("uri:context2")); + + QueryBindingSet expected3 = new QueryBindingSet(); + expected3.addBinding("x", new URIImpl("uri:Eric")); + expected3.addBinding("c", new URIImpl("uri:context3")); + + Set<BindingSet> expected = new HashSet<>(Arrays.asList(expected1, expected2, expected3)); + Set<BindingSet> actual = new HashSet<>(bsList); + + Assert.assertEquals(expected, actual); + dao.delete(Arrays.asList(statement1, statement2, statement3).iterator(), conf); + } + + @Test + public void simpleQueryWithBindingSets() + throws MalformedQueryException, QueryEvaluationException, RyaDAOException { + //query is used to build statement that will be evaluated + String query = "select ?x ?c where{ graph ?c {?x <uri:talksTo> <uri:Bob>. }}"; + SPARQLParser parser = new SPARQLParser(); + ParsedQuery pq = parser.parseQuery(query, null); + List<StatementPattern> spList = StatementPatternCollector.process(pq.getTupleExpr()); + + RyaStatement statement1 = new RyaStatement(new RyaURI("uri:Joe"), new RyaURI("uri:talksTo"), + new RyaType("uri:Bob"), new RyaURI("uri:context1"), "", new StatementMetadata()); + dao.add(statement1); + + RyaStatement statement2 = new RyaStatement(new RyaURI("uri:Doug"), new RyaURI("uri:talksTo"), + new RyaType("uri:Bob"), new RyaURI("uri:context2"), "", new StatementMetadata()); + dao.add(statement2); + + RyaStatement statement3 = new RyaStatement(new RyaURI("uri:Eric"), new RyaURI("uri:talksTo"), + new RyaType("uri:Bob"), new RyaURI("uri:context3"), "", new StatementMetadata()); + dao.add(statement3); + + QueryBindingSet bsConstraint1 = new QueryBindingSet(); + bsConstraint1.addBinding("c", new URIImpl("uri:context2")); + + QueryBindingSet bsConstraint2 = new QueryBindingSet(); + bsConstraint2.addBinding("c", new URIImpl("uri:context1")); + + + CloseableIteration<BindingSet, QueryEvaluationException> iteration = eval.evaluate(spList.get(0), Arrays.asList(bsConstraint1, bsConstraint2)); + + List<BindingSet> bsList = new ArrayList<>(); + while (iteration.hasNext()) { + bsList.add(iteration.next()); + } + + Assert.assertEquals(2, bsList.size()); + + QueryBindingSet expected1 = new QueryBindingSet(); + expected1.addBinding("x", new URIImpl("uri:Joe")); + expected1.addBinding("c", new URIImpl("uri:context1")); + + QueryBindingSet expected2 = new QueryBindingSet(); + expected2.addBinding("x", new URIImpl("uri:Doug")); + expected2.addBinding("c", new URIImpl("uri:context2")); + + Set<BindingSet> expected = new HashSet<>(Arrays.asList(expected1, expected2)); + Set<BindingSet> actual = new HashSet<>(bsList); + + Assert.assertEquals(expected, actual); + + dao.delete(Arrays.asList(statement1, statement2, statement3).iterator(), conf); + } + + + @Test + public void simpleQueryWithBindingSetSameContext() + throws MalformedQueryException, QueryEvaluationException, RyaDAOException { + //query is used to build statement that will be evaluated + String query = "select ?x ?c where{ graph ?c {?x <uri:talksTo> <uri:Bob>. }}"; + SPARQLParser parser = new SPARQLParser(); + ParsedQuery pq = parser.parseQuery(query, null); + List<StatementPattern> spList = StatementPatternCollector.process(pq.getTupleExpr()); + + RyaStatement statement1 = new RyaStatement(new RyaURI("uri:Joe"), new RyaURI("uri:talksTo"), + new RyaType("uri:Bob"), new RyaURI("uri:context1"), "", new StatementMetadata()); + dao.add(statement1); + + RyaStatement statement2 = new RyaStatement(new RyaURI("uri:Doug"), new RyaURI("uri:talksTo"), + new RyaType("uri:Bob"), new RyaURI("uri:context2"), "", new StatementMetadata()); + dao.add(statement2); + + RyaStatement statement3 = new RyaStatement(new RyaURI("uri:Eric"), new RyaURI("uri:talksTo"), + new RyaType("uri:Bob"), new RyaURI("uri:context3"), "", new StatementMetadata()); + dao.add(statement3); + + QueryBindingSet bsConstraint1 = new QueryBindingSet(); + bsConstraint1.addBinding("c", new URIImpl("uri:context1")); + + QueryBindingSet bsConstraint2 = new QueryBindingSet(); + bsConstraint2.addBinding("c", new URIImpl("uri:context1")); + + + CloseableIteration<BindingSet, QueryEvaluationException> iteration = eval.evaluate(spList.get(0), Arrays.asList(bsConstraint1, bsConstraint2)); + + List<BindingSet> bsList = new ArrayList<>(); + while (iteration.hasNext()) { + bsList.add(iteration.next()); + } + + Assert.assertEquals(1, bsList.size()); + + QueryBindingSet expected = new QueryBindingSet(); + expected.addBinding("x", new URIImpl("uri:Joe")); + expected.addBinding("c", new URIImpl("uri:context1")); + + Assert.assertEquals(expected, bsList.get(0)); + + dao.delete(Arrays.asList(statement1, statement2, statement3).iterator(), conf); + } + + @Test + public void simpleQueryNoBindingSetConstantContext() + throws MalformedQueryException, QueryEvaluationException, RyaDAOException { + //query is used to build statement that will be evaluated + String query = "select ?x ?c where{ graph <uri:context1> {?x <uri:talksTo> <uri:Bob>. }}"; + SPARQLParser parser = new SPARQLParser(); + ParsedQuery pq = parser.parseQuery(query, null); + List<StatementPattern> spList = StatementPatternCollector.process(pq.getTupleExpr()); + + RyaStatement statement1 = new RyaStatement(new RyaURI("uri:Joe"), new RyaURI("uri:talksTo"), + new RyaType("uri:Bob"), new RyaURI("uri:context1"), "", new StatementMetadata()); + dao.add(statement1); + + RyaStatement statement2 = new RyaStatement(new RyaURI("uri:Doug"), new RyaURI("uri:talksTo"), + new RyaType("uri:Bob"), new RyaURI("uri:context2"), "", new StatementMetadata()); + dao.add(statement2); + + RyaStatement statement3 = new RyaStatement(new RyaURI("uri:Eric"), new RyaURI("uri:talksTo"), + new RyaType("uri:Bob"), new RyaURI("uri:context3"), "", new StatementMetadata()); + dao.add(statement3); + + QueryBindingSet bsConstraint1 = new QueryBindingSet(); + + CloseableIteration<BindingSet, QueryEvaluationException> iteration = eval.evaluate(spList.get(0), Arrays.asList(bsConstraint1)); + + List<BindingSet> bsList = new ArrayList<>(); + while (iteration.hasNext()) { + bsList.add(iteration.next()); + } + + Assert.assertEquals(1, bsList.size()); + + QueryBindingSet expected = new QueryBindingSet(); + expected.addBinding("x", new URIImpl("uri:Joe")); + + Assert.assertEquals(expected, bsList.get(0)); + + dao.delete(Arrays.asList(statement1, statement2, statement3).iterator(), conf); + } + + @Test + public void simpleQueryWithBindingSetConstantContext() + throws MalformedQueryException, QueryEvaluationException, RyaDAOException { + //query is used to build statement that will be evaluated + String query = "select ?x ?c where{ graph <uri:context1> {?x <uri:talksTo> <uri:Bob>. }}"; + SPARQLParser parser = new SPARQLParser(); + ParsedQuery pq = parser.parseQuery(query, null); + List<StatementPattern> spList = StatementPatternCollector.process(pq.getTupleExpr()); + + RyaStatement statement1 = new RyaStatement(new RyaURI("uri:Joe"), new RyaURI("uri:talksTo"), + new RyaType("uri:Bob"), new RyaURI("uri:context1"), "", new StatementMetadata()); + dao.add(statement1); + + RyaStatement statement2 = new RyaStatement(new RyaURI("uri:Doug"), new RyaURI("uri:talksTo"), + new RyaType("uri:Bob"), new RyaURI("uri:context1"), "", new StatementMetadata()); + dao.add(statement2); + + RyaStatement statement3 = new RyaStatement(new RyaURI("uri:Doug"), new RyaURI("uri:talksTo"), + new RyaType("uri:Bob"), new RyaURI("uri:context2"), "", new StatementMetadata()); + dao.add(statement3); + + QueryBindingSet bsConstraint1 = new QueryBindingSet(); + bsConstraint1.addBinding("x", new URIImpl("uri:Doug")); + + CloseableIteration<BindingSet, QueryEvaluationException> iteration = eval.evaluate(spList.get(0), Arrays.asList(bsConstraint1)); + + List<BindingSet> bsList = new ArrayList<>(); + while (iteration.hasNext()) { + bsList.add(iteration.next()); + } + + Assert.assertEquals(1, bsList.size()); + + QueryBindingSet expected = new QueryBindingSet(); + expected.addBinding("x", new URIImpl("uri:Doug")); + + Assert.assertEquals(expected, bsList.get(0)); + + dao.delete(Arrays.asList(statement1, statement2, statement3).iterator(), conf); + } + + + private static AccumuloRdfConfiguration getConf() { + + final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + + conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, "rya_"); + conf.set("sc.cloudbase.username", "root"); + conf.set("sc.cloudbase.password", ""); + conf.set("sc.cloudbase.instancename", "instance"); + conf.set("sc.cloudbase.authorizations", ""); + + return conf; + } + + +} +
