Repository: incubator-rya
Updated Branches:
  refs/heads/master b03b18938 -> b42191bd8


RYA-261-ContextValidation; closes #146, closes #96


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/b42191bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/b42191bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/b42191bd

Branch: refs/heads/master
Commit: b42191bd8596f3774988a5b07152e98fed63cafd
Parents: b03b189
Author: Caleb Meier <[email protected]>
Authored: Wed Mar 15 09:40:28 2017 -0700
Committer: pujav65 <[email protected]>
Committed: Tue Apr 4 09:08:37 2017 -0400

----------------------------------------------------------------------
 .../accumulo/query/AccumuloRyaQueryEngine.java  | 160 +++++----
 .../accumulo/query/RangeBindingSetEntries.java  |  66 ++--
 .../RdfCloudTripleStoreConnection.java          |   8 +-
 .../ParallelEvaluationStrategyImpl.java         |  13 +-
 .../evaluation/StatementPatternEvalTest.java    | 327 +++++++++++++++++++
 5 files changed, 488 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b42191bd/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/AccumuloRyaQueryEngine.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/AccumuloRyaQueryEngine.java
 
b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/AccumuloRyaQueryEngine.java
index b9e53cf..888d896 100644
--- 
a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/AccumuloRyaQueryEngine.java
+++ 
b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/AccumuloRyaQueryEngine.java
@@ -19,8 +19,6 @@ package org.apache.rya.accumulo.query;
  * under the License.
  */
 
-
-
 import static org.apache.rya.api.RdfCloudTripleStoreUtils.layoutToTable;
 import info.aduna.iteration.CloseableIteration;
 
@@ -57,6 +55,7 @@ import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.data.Column;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
@@ -75,8 +74,7 @@ import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Iterators;
 
 /**
- * Date: 7/17/12
- * Time: 9:28 AM
+ * Date: 7/17/12 Time: 9:28 AM
  */
 public class AccumuloRyaQueryEngine implements 
RyaQueryEngine<AccumuloRdfConfiguration> {
 
@@ -99,7 +97,8 @@ public class AccumuloRyaQueryEngine implements 
RyaQueryEngine<AccumuloRdfConfigu
     }
 
     @Override
-    public CloseableIteration<RyaStatement, RyaDAOException> 
query(RyaStatement stmt, AccumuloRdfConfiguration conf) throws RyaDAOException {
+    public CloseableIteration<RyaStatement, RyaDAOException> 
query(RyaStatement stmt, AccumuloRdfConfiguration conf)
+            throws RyaDAOException {
         if (conf == null) {
             conf = configuration;
         }
@@ -115,32 +114,39 @@ public class AccumuloRyaQueryEngine implements 
RyaQueryEngine<AccumuloRdfConfigu
     }
 
     @Override
-    public CloseableIteration<? extends Map.Entry<RyaStatement, BindingSet>, 
RyaDAOException> queryWithBindingSet(Collection<Map.Entry<RyaStatement, 
BindingSet>> stmts, AccumuloRdfConfiguration conf) throws RyaDAOException {
+    public CloseableIteration<? extends Map.Entry<RyaStatement, BindingSet>, 
RyaDAOException> queryWithBindingSet(
+            Collection<Map.Entry<RyaStatement, BindingSet>> stmts, 
AccumuloRdfConfiguration conf) throws RyaDAOException {
         if (conf == null) {
             conf = configuration;
         }
-        //query configuration
+        // query configuration
         Authorizations authorizations = conf.getAuthorizations();
         Long ttl = conf.getTtl();
         Long maxResults = conf.getLimit();
         Integer maxRanges = conf.getMaxRangesForScanner();
         Integer numThreads = conf.getNumThreads();
 
-        //TODO: cannot span multiple tables here
+        // TODO: cannot span multiple tables here
         try {
             Collection<Range> ranges = new HashSet<Range>();
             RangeBindingSetEntries rangeMap = new RangeBindingSetEntries();
             TABLE_LAYOUT layout = null;
             RyaURI context = null;
             TriplePatternStrategy strategy = null;
-            boolean contextSet = false;
+            RyaURI columnFamily = null;
+            boolean columnFamilySet = false;
             for (Map.Entry<RyaStatement, BindingSet> stmtbs : stmts) {
                 RyaStatement stmt = stmtbs.getKey();
-                if(!contextSet) {
-                    context = stmt.getContext();
-                    contextSet =  true;
-                } else if(context != null && 
!context.equals(stmt.getContext())) {
-                    context = null;
+                context = stmt.getContext();
+                // if all RyaStatements for this query have the same context,
+                // then set the columnFamily to be that value so that Scanner 
can fetch
+                // only that ColumnFamily. Otherwise set columnFamily to null 
so that
+                // Scanner will fetch all ColumnFamilies.
+                if (!columnFamilySet) {
+                    columnFamily = context;
+                    columnFamilySet = true;
+                } else if (columnFamily != null && 
!columnFamily.equals(context)) {
+                    columnFamily = null;
                 }
                 BindingSet bs = stmtbs.getValue();
                 strategy = ryaContext.retrieveStrategy(stmt);
@@ -148,19 +154,50 @@ public class AccumuloRyaQueryEngine implements 
RyaQueryEngine<AccumuloRdfConfigu
                     throw new IllegalArgumentException("TriplePattern[" + stmt 
+ "] not supported");
                 }
 
-                Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, 
ByteRange> entry =
-                        strategy.defineRange(stmt.getSubject(), 
stmt.getPredicate(), stmt.getObject(), stmt.getContext(), conf);
+                Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, 
ByteRange> entry = strategy.defineRange(stmt.getSubject(),
+                        stmt.getPredicate(), stmt.getObject(), 
stmt.getContext(), conf);
 
-                //use range to set scanner
-                //populate scanner based on authorizations, ttl
+                // use range to set scanner
+                // populate scanner based on authorizations, ttl
                 layout = entry.getKey();
                 ByteRange byteRange = entry.getValue();
                 Range range = new Range(new Text(byteRange.getStart()), new 
Text(byteRange.getEnd()));
+                Range rangeMapRange = range;
+                // if context != null, bind context info to Range so that
+                // ColumnFamily Keys returned by Scanner
+                // can be compared to ColumnFamily of start and stop Keys of
+                // Range -- important when querying for named
+                // graphs by requiring that Statements have same context Value
+                // as the Value specified in the BindingSet
+                if (context != null) {
+                    byte[] contextBytes = context.getData().getBytes("UTF-8");
+                    rangeMapRange = range.bound(new Column(contextBytes, new 
byte[] { (byte) 0x00 }, new byte[] { (byte) 0x00 }),
+                            new Column(contextBytes, new byte[] { (byte) 0xff 
}, new byte[] { (byte) 0xff }));
+                }
+                // ranges gets a Range that has no Column bounds, but
+                // rangeMap gets a Range that does have Column bounds
+                // If we inserted multiple Ranges with the same Row (but
+                // distinct Column bounds) into the Set ranges, we would get
+                // duplicate
+                // results when the Row is not exact. So RyaStatements that
+                // differ only in their context are all mapped to the same
+                // Range (with no Column bounds) for scanning purposes.
+                // However, context information is included in a Column that
+                // bounds the Range inserted into rangeMap. This is because
+                // in the class {@link RyaStatementBindingSetKeyValueIterator},
+                // the rangeMap is
+                // used to join the scan results with the BindingSets to 
produce
+                // the query results. The additional ColumnFamily info is
+                // required in this join
+                // process to allow for the Statement contexts to be compared
+                // with the BindingSet contexts
+                // See {@link RangeBindingSetEntries#containsKey}.
                 ranges.add(range);
-                rangeMap.ranges.add(new 
RdfCloudTripleStoreUtils.CustomEntry<Range, BindingSet>(range, bs));
+                rangeMap.put(rangeMapRange, bs);
             }
-            //no ranges
-            if (layout == null) return null;
+            // no ranges
+            if (layout == null)
+                return null;
             String regexSubject = conf.getRegexSubject();
             String regexPredicate = conf.getRegexPredicate();
             String regexObject = conf.getRegexObject();
@@ -172,7 +209,7 @@ public class AccumuloRyaQueryEngine implements 
RyaQueryEngine<AccumuloRdfConfigu
             if (useBatchScanner) {
                 ScannerBase scanner = connector.createBatchScanner(table, 
authorizations, numThreads);
                 ((BatchScanner) scanner).setRanges(ranges);
-                fillScanner(scanner, context, null, ttl, null, tripleRowRegex, 
conf);
+                fillScanner(scanner, columnFamily, null, ttl, null, 
tripleRowRegex, conf);
                 iterator = new RyaStatementBindingSetKeyValueIterator(layout, 
ryaContext, scanner, rangeMap);
             } else {
                 Scanner scannerBase = null;
@@ -181,7 +218,7 @@ public class AccumuloRyaQueryEngine implements 
RyaQueryEngine<AccumuloRdfConfigu
                 for (Range range : ranges) {
                     scannerBase = connector.createScanner(table, 
authorizations);
                     scannerBase.setRange(range);
-                    fillScanner(scannerBase, context, null, ttl, null, 
tripleRowRegex, conf);
+                    fillScanner(scannerBase, columnFamily, null, ttl, null, 
tripleRowRegex, conf);
                     iters[i] = scannerBase.iterator();
                     i++;
                 }
@@ -216,7 +253,7 @@ public class AccumuloRyaQueryEngine implements 
RyaQueryEngine<AccumuloRdfConfigu
         RyaStatement stmt = ryaQuery.getQuery();
         Preconditions.checkNotNull(stmt);
 
-        //query configuration
+        // query configuration
         String[] auths = ryaQuery.getAuths();
         Authorizations authorizations = auths != null ? new 
Authorizations(auths) : configuration.getAuthorizations();
         Long ttl = ryaQuery.getTtl();
@@ -229,7 +266,7 @@ public class AccumuloRyaQueryEngine implements 
RyaQueryEngine<AccumuloRdfConfigu
         TableLayoutStrategy tableLayoutStrategy = 
configuration.getTableLayoutStrategy();
 
         try {
-            //find triple pattern range
+            // find triple pattern range
             TriplePatternStrategy strategy = ryaContext.retrieveStrategy(stmt);
             TABLE_LAYOUT layout;
             Range range;
@@ -240,9 +277,9 @@ public class AccumuloRyaQueryEngine implements 
RyaQueryEngine<AccumuloRdfConfigu
             String qualifier = stmt.getQualifer();
             TripleRowRegex tripleRowRegex = null;
             if (strategy != null) {
-                //otherwise, full table scan is supported
-                Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, 
ByteRange> entry =
-                        strategy.defineRange(subject, predicate, object, 
context, null);
+                // otherwise, full table scan is supported
+                Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, 
ByteRange> entry = strategy.defineRange(subject, predicate, object,
+                        context, null);
                 layout = entry.getKey();
                 ByteRange byteRange = entry.getValue();
                 range = new Range(new Text(byteRange.getStart()), new 
Text(byteRange.getEnd()));
@@ -255,7 +292,7 @@ public class AccumuloRyaQueryEngine implements 
RyaQueryEngine<AccumuloRdfConfigu
 
             byte[] objectTypeInfo = null;
             if (object != null) {
-                //TODO: Not good to serialize this twice
+                // TODO: Not good to serialize this twice
                 if (object instanceof RyaRange) {
                     objectTypeInfo = 
RyaContext.getInstance().serializeType(((RyaRange) object).getStart())[1];
                 } else {
@@ -265,8 +302,8 @@ public class AccumuloRyaQueryEngine implements 
RyaQueryEngine<AccumuloRdfConfigu
 
             tripleRowRegex = strategy.buildRegex(regexSubject, regexPredicate, 
regexObject, null, objectTypeInfo);
 
-            //use range to set scanner
-            //populate scanner based on authorizations, ttl
+            // use range to set scanner
+            // populate scanner based on authorizations, ttl
             String table = layoutToTable(layout, tableLayoutStrategy);
             Scanner scanner = connector.createScanner(table, authorizations);
             scanner.setRange(range);
@@ -293,7 +330,7 @@ public class AccumuloRyaQueryEngine implements 
RyaQueryEngine<AccumuloRdfConfigu
         Iterable<RyaStatement> stmts = ryaQuery.getQueries();
         Preconditions.checkNotNull(stmts);
 
-        //query configuration
+        // query configuration
         String[] auths = ryaQuery.getAuths();
         final Authorizations authorizations = auths != null ? new 
Authorizations(auths) : configuration.getAuthorizations();
         final Long ttl = ryaQuery.getTtl();
@@ -307,31 +344,32 @@ public class AccumuloRyaQueryEngine implements 
RyaQueryEngine<AccumuloRdfConfigu
         TableLayoutStrategy tableLayoutStrategy = 
configuration.getTableLayoutStrategy();
         int maxRanges = ryaQuery.getMaxRanges();
 
-        //TODO: cannot span multiple tables here
+        // TODO: cannot span multiple tables here
         try {
             Collection<Range> ranges = new HashSet<Range>();
             TABLE_LAYOUT layout = null;
             RyaURI context = null;
             TriplePatternStrategy strategy = null;
             for (RyaStatement stmt : stmts) {
-                context = stmt.getContext(); //TODO: This will be overwritten
+                context = stmt.getContext(); // TODO: This will be overwritten
                 strategy = ryaContext.retrieveStrategy(stmt);
                 if (strategy == null) {
                     throw new IllegalArgumentException("TriplePattern[" + stmt 
+ "] not supported");
                 }
 
-                Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, 
ByteRange> entry =
-                        strategy.defineRange(stmt.getSubject(), 
stmt.getPredicate(), stmt.getObject(), stmt.getContext(), null);
+                Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, 
ByteRange> entry = strategy.defineRange(stmt.getSubject(),
+                        stmt.getPredicate(), stmt.getObject(), 
stmt.getContext(), null);
 
-                //use range to set scanner
-                //populate scanner based on authorizations, ttl
+                // use range to set scanner
+                // populate scanner based on authorizations, ttl
                 layout = entry.getKey();
                 ByteRange byteRange = entry.getValue();
                 Range range = new Range(new Text(byteRange.getStart()), new 
Text(byteRange.getEnd()));
                 ranges.add(range);
             }
-            //no ranges
-            if (layout == null) throw new IllegalArgumentException("No table 
layout specified");
+            // no ranges
+            if (layout == null)
+                throw new IllegalArgumentException("No table layout 
specified");
 
             final TripleRowRegex tripleRowRegex = 
strategy.buildRegex(regexSubject, regexPredicate, regexObject, null, null);
 
@@ -342,23 +380,25 @@ public class AccumuloRyaQueryEngine implements 
RyaQueryEngine<AccumuloRdfConfigu
                 BatchScanner scanner = connector.createBatchScanner(table, 
authorizations, numQueryThreads);
                 scanner.setRanges(ranges);
                 fillScanner(scanner, context, null, ttl, null, tripleRowRegex, 
ryaQuery.getConf());
-                results = FluentCloseableIterable.from(new 
ScannerBaseCloseableIterable(scanner)).transform(keyValueToRyaStatementFunctionMap.get(layout));
+                results = FluentCloseableIterable.from(new 
ScannerBaseCloseableIterable(scanner))
+                        
.transform(keyValueToRyaStatementFunctionMap.get(layout));
             } else {
                 final RyaURI fcontext = context;
                 final RdfCloudTripleStoreConfiguration fconf = 
ryaQuery.getConf();
-                FluentIterable<RyaStatement> fluent = 
FluentIterable.from(ranges).transformAndConcat(new Function<Range, 
Iterable<Map.Entry<Key, Value>>>() {
-                    @Override
-                    public Iterable<Map.Entry<Key, Value>> apply(Range range) {
-                        try {
-                            Scanner scanner = connector.createScanner(table, 
authorizations);
-                            scanner.setRange(range);
-                            fillScanner(scanner, fcontext, null, ttl, null, 
tripleRowRegex, fconf);
-                            return scanner;
-                        } catch (Exception e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-                }).transform(keyValueToRyaStatementFunctionMap.get(layout));
+                FluentIterable<RyaStatement> fluent = 
FluentIterable.from(ranges)
+                        .transformAndConcat(new Function<Range, 
Iterable<Map.Entry<Key, Value>>>() {
+                            @Override
+                            public Iterable<Map.Entry<Key, Value>> apply(Range 
range) {
+                                try {
+                                    Scanner scanner = 
connector.createScanner(table, authorizations);
+                                    scanner.setRange(range);
+                                    fillScanner(scanner, fcontext, null, ttl, 
null, tripleRowRegex, fconf);
+                                    return scanner;
+                                } catch (Exception e) {
+                                    throw new RuntimeException(e);
+                                }
+                            }
+                        
}).transform(keyValueToRyaStatementFunctionMap.get(layout));
 
                 results = 
FluentCloseableIterable.from(CloseableIterables.wrap(fluent));
             }
@@ -371,7 +411,8 @@ public class AccumuloRyaQueryEngine implements 
RyaQueryEngine<AccumuloRdfConfigu
         }
     }
 
-    protected void fillScanner(ScannerBase scanner, RyaURI context, String 
qualifier, Long ttl, Long currentTime, TripleRowRegex tripleRowRegex, 
RdfCloudTripleStoreConfiguration conf) throws IOException {
+    protected void fillScanner(ScannerBase scanner, RyaURI context, String 
qualifier, Long ttl, Long currentTime,
+            TripleRowRegex tripleRowRegex, RdfCloudTripleStoreConfiguration 
conf) throws IOException {
         if (context != null && qualifier != null) {
             scanner.fetchColumn(new Text(context.getData()), new 
Text(qualifier));
         } else if (context != null) {
@@ -383,8 +424,8 @@ public class AccumuloRyaQueryEngine implements 
RyaQueryEngine<AccumuloRdfConfigu
         }
         if (ttl != null) {
             IteratorSetting setting = new IteratorSetting(9, "fi", 
TimestampFilter.class.getName());
-            TimestampFilter.setStart(setting,  System.currentTimeMillis() - 
ttl, true);
-            if(currentTime != null){
+            TimestampFilter.setStart(setting, System.currentTimeMillis() - 
ttl, true);
+            if (currentTime != null) {
                 TimestampFilter.setStart(setting, currentTime - ttl, true);
                 TimestampFilter.setEnd(setting, currentTime, true);
             }
@@ -397,8 +438,9 @@ public class AccumuloRyaQueryEngine implements 
RyaQueryEngine<AccumuloRdfConfigu
             scanner.addScanIterator(setting);
         }
         if (conf instanceof AccumuloRdfConfiguration) {
-            //TODO should we take the iterator settings as is or should we 
adjust the priority based on the above?
-            for (IteratorSetting itr : 
((AccumuloRdfConfiguration)conf).getAdditionalIterators()) {
+            // TODO should we take the iterator settings as is or should we
+            // adjust the priority based on the above?
+            for (IteratorSetting itr : ((AccumuloRdfConfiguration) 
conf).getAdditionalIterators()) {
                 scanner.addScanIterator(itr);
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b42191bd/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/RangeBindingSetEntries.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/RangeBindingSetEntries.java
 
b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/RangeBindingSetEntries.java
index 35ef776..4887ba0 100644
--- 
a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/RangeBindingSetEntries.java
+++ 
b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/RangeBindingSetEntries.java
@@ -1,5 +1,9 @@
 package org.apache.rya.accumulo.query;
 
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -19,40 +23,64 @@ package org.apache.rya.accumulo.query;
  * under the License.
  */
 
-
-
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparator;
 import org.openrdf.query.BindingSet;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
 
 /**
- * Class RangeBindingSetCollection
- * Date: Feb 23, 2011
- * Time: 10:15:48 AM
+ * Class RangeBindingSetCollection Date: Feb 23, 2011 Time: 10:15:48 AM
  */
 public class RangeBindingSetEntries {
-    public Collection<Map.Entry<Range, BindingSet>> ranges;
+    private Multimap<Range, BindingSet> ranges = HashMultimap.create();
 
     public RangeBindingSetEntries() {
-        this(new ArrayList<Map.Entry<Range, BindingSet>>());
+        ranges = HashMultimap.create();
     }
 
-    public RangeBindingSetEntries(Collection<Map.Entry<Range, BindingSet>> 
ranges) {
-        this.ranges = ranges;
+    public void put(Range range, BindingSet bs) {
+        ranges.put(range, bs);
     }
 
     public Collection<BindingSet> containsKey(Key key) {
-        //TODO: need to find a better way to sort these and pull
-        //TODO: maybe fork/join here
-        Collection<BindingSet> bss = new ArrayList<BindingSet>();
-        for (Map.Entry<Range, BindingSet> entry : ranges) {
-            if (entry.getKey().contains(key))
-                bss.add(entry.getValue());
+        Set<BindingSet> bsSet = new HashSet<>();
+        for (Range range : ranges.keySet()) {
+            // Check to see if the Key falls within Range and has same 
ColumnFamily
+            // as beginning and ending key of Range.
+            // The additional ColumnFamily check by the method
+            // validateContext(...) is necessary because range.contains(key)
+            // returns true if only the Row is within the Range but the 
ColumnFamily
+            // doesn't fall within the Range ColumnFamily bounds.
+            if (range.contains(key) && validateContext(key.getColumnFamily(), 
range.getStartKey().getColumnFamily(),
+                    range.getEndKey().getColumnFamily())) {
+                bsSet.addAll(ranges.get(range));
+            }
+        }
+        return bsSet;
+    }
+
+    /**
+     * 
+     * @param colFamily
+     * @param startColFamily
+     * @param stopColFamily
+     * @return true if colFamily lies between startColFamily and stopColFamily
+     */
+    private boolean validateContext(Text colFamily, Text startColFamily, Text 
stopColFamily) {
+        byte[] cfBytes = colFamily.getBytes();
+        byte[] start = startColFamily.getBytes();
+        byte[] stop = stopColFamily.getBytes();
+        // range has empty column family, so all Keys falling with Range Row
+        // constraints should match
+        if (start.length == 0 && stop.length == 0) {
+            return true;
         }
-        return bss;
+        int result1 = WritableComparator.compareBytes(cfBytes, 0, 
cfBytes.length, start, 0, start.length);
+        int result2 = WritableComparator.compareBytes(cfBytes, 0, 
cfBytes.length, stop, 0, stop.length);
+        return result1 >= 0 && result2 <= 0;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b42191bd/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java
----------------------------------------------------------------------
diff --git 
a/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java
 
b/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java
index 85440fd..ea8db77 100644
--- 
a/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java
+++ 
b/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java
@@ -296,7 +296,7 @@ public class RdfCloudTripleStoreConnection extends 
SailConnectionBase {
             }
             
             final ParallelEvaluationStrategyImpl strategy = new 
ParallelEvaluationStrategyImpl(
-                    new StoreTripleSource(queryConf), inferenceEngine, 
dataset, queryConf);
+                    new StoreTripleSource(queryConf, ryaDAO), inferenceEngine, 
dataset, queryConf);
             
                 (new BindingAssigner()).optimize(tupleExpr, dataset, bindings);
                 (new ConstantOptimizer(strategy)).optimize(tupleExpr, dataset,
@@ -591,12 +591,14 @@ public class RdfCloudTripleStoreConnection extends 
SailConnectionBase {
         //TODO: ?
     }
 
-    public class StoreTripleSource implements TripleSource {
+    public static class StoreTripleSource implements TripleSource {
 
         private RdfCloudTripleStoreConfiguration conf;
+        private RyaDAO<?> ryaDAO;
 
-        public StoreTripleSource(RdfCloudTripleStoreConfiguration conf) {
+        public StoreTripleSource(RdfCloudTripleStoreConfiguration conf, 
RyaDAO<?> ryaDAO) {
             this.conf = conf;
+            this.ryaDAO = ryaDAO;
         }
 
         public CloseableIteration<Statement, QueryEvaluationException> 
getStatements(

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b42191bd/sail/src/main/java/org/apache/rya/rdftriplestore/evaluation/ParallelEvaluationStrategyImpl.java
----------------------------------------------------------------------
diff --git 
a/sail/src/main/java/org/apache/rya/rdftriplestore/evaluation/ParallelEvaluationStrategyImpl.java
 
b/sail/src/main/java/org/apache/rya/rdftriplestore/evaluation/ParallelEvaluationStrategyImpl.java
index c1eb68c..a3b70b6 100644
--- 
a/sail/src/main/java/org/apache/rya/rdftriplestore/evaluation/ParallelEvaluationStrategyImpl.java
+++ 
b/sail/src/main/java/org/apache/rya/rdftriplestore/evaluation/ParallelEvaluationStrategyImpl.java
@@ -42,6 +42,7 @@ import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
 import org.apache.rya.api.RdfCloudTripleStoreUtils;
 import org.apache.rya.api.utils.NullableStatementImpl;
 import org.apache.rya.rdftriplestore.RdfCloudTripleStoreConnection;
+import 
org.apache.rya.rdftriplestore.RdfCloudTripleStoreConnection.StoreTripleSource;
 import org.apache.rya.rdftriplestore.inference.InferenceEngine;
 import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
 import org.apache.rya.rdftriplestore.utils.FixedStatementPattern;
@@ -83,7 +84,7 @@ public class ParallelEvaluationStrategyImpl extends 
EvaluationStrategyImpl {
     private ExecutorService executorService;
     private InferenceEngine inferenceEngine;
 
-    public 
ParallelEvaluationStrategyImpl(RdfCloudTripleStoreConnection.StoreTripleSource 
tripleSource, InferenceEngine inferenceEngine,
+    public ParallelEvaluationStrategyImpl(StoreTripleSource tripleSource, 
InferenceEngine inferenceEngine,
                                           Dataset dataset, 
RdfCloudTripleStoreConfiguration conf) {
         super(tripleSource, dataset);
         Integer nthreads = conf.getNumThreads();
@@ -221,16 +222,18 @@ public class ParallelEvaluationStrategyImpl extends 
EvaluationStrategyImpl {
                 Statement st = stbs.getKey();
                 BindingSet bs = stbs.getValue();
                 QueryBindingSet result = new QueryBindingSet(bs);
-                if (subjVar != null && !result.hasBinding(subjVar.getName())) {
+                //only add values to result BindingSet if Var is not constant 
and BindingSet doesn't already
+                //contain a Value for that Var name
+                if (subjVar != null && !subjVar.isConstant() && 
!result.hasBinding(subjVar.getName())) {
                     result.addBinding(subjVar.getName(), st.getSubject());
                 }
-                if (predVar != null && !result.hasBinding(predVar.getName())) {
+                if (predVar != null && !predVar.isConstant() && 
!result.hasBinding(predVar.getName())) {
                     result.addBinding(predVar.getName(), st.getPredicate());
                 }
-                if (objVar != null && !result.hasBinding(objVar.getName())) {
+                if (objVar != null && !objVar.isConstant() && 
!result.hasBinding(objVar.getName())) {
                     result.addBinding(objVar.getName(), st.getObject());
                 }
-                if (cntxtVar != null && !result.hasBinding(cntxtVar.getName()) 
&& st.getContext() != null) {
+                if (cntxtVar != null && !cntxtVar.isConstant() && 
!result.hasBinding(cntxtVar.getName()) && st.getContext() != null) {
                     result.addBinding(cntxtVar.getName(), st.getContext());
                 }
                 return result;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b42191bd/sail/src/test/java/org/apache/rya/rdftriplestore/evaluation/StatementPatternEvalTest.java
----------------------------------------------------------------------
diff --git 
a/sail/src/test/java/org/apache/rya/rdftriplestore/evaluation/StatementPatternEvalTest.java
 
b/sail/src/test/java/org/apache/rya/rdftriplestore/evaluation/StatementPatternEvalTest.java
new file mode 100644
index 0000000..c1e7d10
--- /dev/null
+++ 
b/sail/src/test/java/org/apache/rya/rdftriplestore/evaluation/StatementPatternEvalTest.java
@@ -0,0 +1,327 @@
+package org.apache.rya.rdftriplestore.evaluation;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.accumulo.AccumuloRyaDAO;
+import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.domain.StatementMetadata;
+import org.apache.rya.api.persist.RyaDAOException;
+import 
org.apache.rya.rdftriplestore.RdfCloudTripleStoreConnection.StoreTripleSource;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import org.openrdf.query.algebra.helpers.StatementPatternCollector;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import info.aduna.iteration.CloseableIteration;
+
+public class StatementPatternEvalTest {
+
+    private AccumuloRyaDAO dao;
+    private AccumuloRdfConfiguration conf;
+    private ParallelEvaluationStrategyImpl eval;
+
+    @Before
+    public void init() throws Exception {
+        conf = getConf();
+        Instance mock = new MockInstance("instance");
+        Connector conn = mock.getConnector("root", new PasswordToken(""));
+        dao = new AccumuloRyaDAO();
+        dao.setConnector(conn);
+        dao.init();
+        eval = new ParallelEvaluationStrategyImpl(new StoreTripleSource(conf, 
dao), null, null, conf);
+    }
+
+    @After
+    public void close() throws RyaDAOException {
+        eval.shutdown();
+        dao.destroy();
+    }
+    
+    @Test
+    public void simpleQueryWithoutBindingSets()
+            throws MalformedQueryException, QueryEvaluationException, 
RyaDAOException {
+        //query is used to build statement that will be evaluated
+        String query = "select ?x ?c where{ graph ?c  {?x <uri:talksTo> 
<uri:Bob>. }}";
+        SPARQLParser parser = new SPARQLParser();
+        ParsedQuery pq = parser.parseQuery(query, null);
+        List<StatementPattern> spList = 
StatementPatternCollector.process(pq.getTupleExpr());
+        
+        RyaStatement statement1 = new RyaStatement(new RyaURI("uri:Joe"), new 
RyaURI("uri:talksTo"),
+                new RyaType("uri:Bob"), new RyaURI("uri:context1"), "", new 
StatementMetadata());
+        dao.add(statement1);
+        
+        RyaStatement statement2 = new RyaStatement(new RyaURI("uri:Doug"), new 
RyaURI("uri:talksTo"),
+                new RyaType("uri:Bob"), new RyaURI("uri:context2"), "", new 
StatementMetadata());
+        dao.add(statement2);
+        
+        RyaStatement statement3 = new RyaStatement(new RyaURI("uri:Eric"), new 
RyaURI("uri:talksTo"),
+                new RyaType("uri:Bob"), new RyaURI("uri:context3"), "", new 
StatementMetadata());
+        dao.add(statement3);
+
+        QueryBindingSet bsConstraint1 = new QueryBindingSet();
+        
+        CloseableIteration<BindingSet, QueryEvaluationException> iteration = 
eval.evaluate(spList.get(0), Arrays.asList(bsConstraint1));
+
+        List<BindingSet> bsList = new ArrayList<>();
+        while (iteration.hasNext()) {
+            bsList.add(iteration.next());
+        }
+
+        Assert.assertEquals(3, bsList.size());
+        
+        QueryBindingSet expected1 = new QueryBindingSet();
+        expected1.addBinding("x", new URIImpl("uri:Joe"));
+        expected1.addBinding("c", new URIImpl("uri:context1"));
+
+        QueryBindingSet expected2 = new QueryBindingSet();
+        expected2.addBinding("x", new URIImpl("uri:Doug"));
+        expected2.addBinding("c", new URIImpl("uri:context2"));
+        
+        QueryBindingSet expected3 = new QueryBindingSet();
+        expected3.addBinding("x", new URIImpl("uri:Eric"));
+        expected3.addBinding("c", new URIImpl("uri:context3"));
+        
+        Set<BindingSet> expected = new HashSet<>(Arrays.asList(expected1, 
expected2, expected3));
+        Set<BindingSet> actual = new HashSet<>(bsList);
+        
+        Assert.assertEquals(expected, actual);
+        dao.delete(Arrays.asList(statement1, statement2, 
statement3).iterator(), conf);
+    }
+
+    @Test
+    public void simpleQueryWithBindingSets()
+            throws MalformedQueryException, QueryEvaluationException, 
RyaDAOException {
+        //query is used to build statement that will be evaluated
+        String query = "select ?x ?c where{ graph ?c  {?x <uri:talksTo> 
<uri:Bob>. }}";
+        SPARQLParser parser = new SPARQLParser();
+        ParsedQuery pq = parser.parseQuery(query, null);
+        List<StatementPattern> spList = 
StatementPatternCollector.process(pq.getTupleExpr());
+        
+        RyaStatement statement1 = new RyaStatement(new RyaURI("uri:Joe"), new 
RyaURI("uri:talksTo"),
+                new RyaType("uri:Bob"), new RyaURI("uri:context1"), "", new 
StatementMetadata());
+        dao.add(statement1);
+        
+        RyaStatement statement2 = new RyaStatement(new RyaURI("uri:Doug"), new 
RyaURI("uri:talksTo"),
+                new RyaType("uri:Bob"), new RyaURI("uri:context2"), "", new 
StatementMetadata());
+        dao.add(statement2);
+        
+        RyaStatement statement3 = new RyaStatement(new RyaURI("uri:Eric"), new 
RyaURI("uri:talksTo"),
+                new RyaType("uri:Bob"), new RyaURI("uri:context3"), "", new 
StatementMetadata());
+        dao.add(statement3);
+
+        QueryBindingSet bsConstraint1 = new QueryBindingSet();
+        bsConstraint1.addBinding("c", new URIImpl("uri:context2"));
+        
+        QueryBindingSet bsConstraint2 = new QueryBindingSet();
+        bsConstraint2.addBinding("c", new URIImpl("uri:context1"));
+
+        
+        CloseableIteration<BindingSet, QueryEvaluationException> iteration = 
eval.evaluate(spList.get(0), Arrays.asList(bsConstraint1, bsConstraint2));
+
+        List<BindingSet> bsList = new ArrayList<>();
+        while (iteration.hasNext()) {
+            bsList.add(iteration.next());
+        }
+
+        Assert.assertEquals(2, bsList.size());
+        
+        QueryBindingSet expected1 = new QueryBindingSet();
+        expected1.addBinding("x", new URIImpl("uri:Joe"));
+        expected1.addBinding("c", new URIImpl("uri:context1"));
+
+        QueryBindingSet expected2 = new QueryBindingSet();
+        expected2.addBinding("x", new URIImpl("uri:Doug"));
+        expected2.addBinding("c", new URIImpl("uri:context2"));
+        
+        Set<BindingSet> expected = new HashSet<>(Arrays.asList(expected1, 
expected2));
+        Set<BindingSet> actual = new HashSet<>(bsList);
+        
+        Assert.assertEquals(expected, actual);
+        
+        dao.delete(Arrays.asList(statement1, statement2, 
statement3).iterator(), conf);
+    }
+
+    
+    @Test
+    public void simpleQueryWithBindingSetSameContext()
+            throws MalformedQueryException, QueryEvaluationException, 
RyaDAOException {
+        //query is used to build statement that will be evaluated
+        String query = "select ?x ?c where{ graph ?c  {?x <uri:talksTo> 
<uri:Bob>. }}";
+        SPARQLParser parser = new SPARQLParser();
+        ParsedQuery pq = parser.parseQuery(query, null);
+        List<StatementPattern> spList = 
StatementPatternCollector.process(pq.getTupleExpr());
+        
+        RyaStatement statement1 = new RyaStatement(new RyaURI("uri:Joe"), new 
RyaURI("uri:talksTo"),
+                new RyaType("uri:Bob"), new RyaURI("uri:context1"), "", new 
StatementMetadata());
+        dao.add(statement1);
+        
+        RyaStatement statement2 = new RyaStatement(new RyaURI("uri:Doug"), new 
RyaURI("uri:talksTo"),
+                new RyaType("uri:Bob"), new RyaURI("uri:context2"), "", new 
StatementMetadata());
+        dao.add(statement2);
+        
+        RyaStatement statement3 = new RyaStatement(new RyaURI("uri:Eric"), new 
RyaURI("uri:talksTo"),
+                new RyaType("uri:Bob"), new RyaURI("uri:context3"), "", new 
StatementMetadata());
+        dao.add(statement3);
+
+        QueryBindingSet bsConstraint1 = new QueryBindingSet();
+        bsConstraint1.addBinding("c", new URIImpl("uri:context1"));
+        
+        QueryBindingSet bsConstraint2 = new QueryBindingSet();
+        bsConstraint2.addBinding("c", new URIImpl("uri:context1"));
+
+        
+        CloseableIteration<BindingSet, QueryEvaluationException> iteration = 
eval.evaluate(spList.get(0), Arrays.asList(bsConstraint1, bsConstraint2));
+
+        List<BindingSet> bsList = new ArrayList<>();
+        while (iteration.hasNext()) {
+            bsList.add(iteration.next());
+        }
+
+        Assert.assertEquals(1, bsList.size());
+        
+        QueryBindingSet expected = new QueryBindingSet();
+        expected.addBinding("x", new URIImpl("uri:Joe"));
+        expected.addBinding("c", new URIImpl("uri:context1"));
+        
+        Assert.assertEquals(expected, bsList.get(0));
+
+        dao.delete(Arrays.asList(statement1, statement2, 
statement3).iterator(), conf);
+    }
+    
+    @Test
+    public void simpleQueryNoBindingSetConstantContext()
+            throws MalformedQueryException, QueryEvaluationException, 
RyaDAOException {
+        //query is used to build statement that will be evaluated
+        String query = "select ?x ?c where{ graph <uri:context1>  {?x 
<uri:talksTo> <uri:Bob>. }}";
+        SPARQLParser parser = new SPARQLParser();
+        ParsedQuery pq = parser.parseQuery(query, null);
+        List<StatementPattern> spList = 
StatementPatternCollector.process(pq.getTupleExpr());
+        
+        RyaStatement statement1 = new RyaStatement(new RyaURI("uri:Joe"), new 
RyaURI("uri:talksTo"),
+                new RyaType("uri:Bob"), new RyaURI("uri:context1"), "", new 
StatementMetadata());
+        dao.add(statement1);
+        
+        RyaStatement statement2 = new RyaStatement(new RyaURI("uri:Doug"), new 
RyaURI("uri:talksTo"),
+                new RyaType("uri:Bob"), new RyaURI("uri:context2"), "", new 
StatementMetadata());
+        dao.add(statement2);
+        
+        RyaStatement statement3 = new RyaStatement(new RyaURI("uri:Eric"), new 
RyaURI("uri:talksTo"),
+                new RyaType("uri:Bob"), new RyaURI("uri:context3"), "", new 
StatementMetadata());
+        dao.add(statement3);
+
+        QueryBindingSet bsConstraint1 = new QueryBindingSet();
+        
+        CloseableIteration<BindingSet, QueryEvaluationException> iteration = 
eval.evaluate(spList.get(0), Arrays.asList(bsConstraint1));
+
+        List<BindingSet> bsList = new ArrayList<>();
+        while (iteration.hasNext()) {
+            bsList.add(iteration.next());
+        }
+        
+        Assert.assertEquals(1, bsList.size());
+       
+        QueryBindingSet expected = new QueryBindingSet();
+        expected.addBinding("x", new URIImpl("uri:Joe"));
+        
+        Assert.assertEquals(expected, bsList.get(0));
+        
+        dao.delete(Arrays.asList(statement1, statement2, 
statement3).iterator(), conf);
+    }
+
+    @Test
+    public void simpleQueryWithBindingSetConstantContext()
+            throws MalformedQueryException, QueryEvaluationException, 
RyaDAOException {
+        //query is used to build statement that will be evaluated
+        String query = "select ?x ?c where{ graph <uri:context1>  {?x 
<uri:talksTo> <uri:Bob>. }}";
+        SPARQLParser parser = new SPARQLParser();
+        ParsedQuery pq = parser.parseQuery(query, null);
+        List<StatementPattern> spList = 
StatementPatternCollector.process(pq.getTupleExpr());
+        
+        RyaStatement statement1 = new RyaStatement(new RyaURI("uri:Joe"), new 
RyaURI("uri:talksTo"),
+                new RyaType("uri:Bob"), new RyaURI("uri:context1"), "", new 
StatementMetadata());
+        dao.add(statement1);
+        
+        RyaStatement statement2 = new RyaStatement(new RyaURI("uri:Doug"), new 
RyaURI("uri:talksTo"),
+                new RyaType("uri:Bob"), new RyaURI("uri:context1"), "", new 
StatementMetadata());
+        dao.add(statement2);
+        
+        RyaStatement statement3 = new RyaStatement(new RyaURI("uri:Doug"), new 
RyaURI("uri:talksTo"),
+                new RyaType("uri:Bob"), new RyaURI("uri:context2"), "", new 
StatementMetadata());
+        dao.add(statement3);
+
+        QueryBindingSet bsConstraint1 = new QueryBindingSet();
+        bsConstraint1.addBinding("x", new URIImpl("uri:Doug"));
+        
+        CloseableIteration<BindingSet, QueryEvaluationException> iteration = 
eval.evaluate(spList.get(0), Arrays.asList(bsConstraint1));
+
+        List<BindingSet> bsList = new ArrayList<>();
+        while (iteration.hasNext()) {
+            bsList.add(iteration.next());
+        }
+
+        Assert.assertEquals(1, bsList.size());
+        
+        QueryBindingSet expected = new QueryBindingSet();
+        expected.addBinding("x", new URIImpl("uri:Doug"));
+
+        Assert.assertEquals(expected, bsList.get(0));
+        
+        dao.delete(Arrays.asList(statement1, statement2, 
statement3).iterator(), conf);
+    }
+
+    
+    private static AccumuloRdfConfiguration getConf() {
+
+        final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+
+        conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, "rya_");
+        conf.set("sc.cloudbase.username", "root");
+        conf.set("sc.cloudbase.password", "");
+        conf.set("sc.cloudbase.instancename", "instance");
+        conf.set("sc.cloudbase.authorizations", "");
+
+        return conf;
+    }
+
+
+}
+


Reply via email to