http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/ShardSubjectLookupStatementIterator.java
----------------------------------------------------------------------
diff --git 
a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/ShardSubjectLookupStatementIterator.java
 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/ShardSubjectLookupStatementIterator.java
new file mode 100644
index 0000000..097c52c
--- /dev/null
+++ 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/ShardSubjectLookupStatementIterator.java
@@ -0,0 +1,493 @@
+package mvm.mmrts.rdf.partition.query.evaluation;
+
+import cloudbase.core.client.BatchScanner;
+import cloudbase.core.client.Connector;
+import cloudbase.core.client.TableNotFoundException;
+import cloudbase.core.data.Key;
+import cloudbase.core.data.Range;
+import cloudbase.core.data.Value;
+import cloudbase.core.security.Authorizations;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import info.aduna.iteration.CloseableIteration;
+import info.aduna.iteration.EmptyIteration;
+import mvm.mmrts.rdf.partition.PartitionSail;
+import mvm.mmrts.rdf.partition.query.evaluation.select.FilterIterator;
+import mvm.mmrts.rdf.partition.query.evaluation.select.SelectAllIterator;
+import mvm.mmrts.rdf.partition.query.operators.ShardSubjectLookup;
+import mvm.mmrts.rdf.partition.shard.DateHashModShardValueGenerator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.openrdf.model.URI;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.Var;
+import ss.cloudbase.core.iterators.CellLevelRecordIterator;
+import ss.cloudbase.core.iterators.GMDenIntersectingIterator;
+import ss.cloudbase.core.iterators.SortedRangeIterator;
+import ss.cloudbase.core.iterators.filter.CBConverter;
+
+import java.io.IOException;
+import java.util.*;
+
+import static mvm.mmrts.rdf.partition.PartitionConstants.*;
+import static mvm.mmrts.rdf.partition.utils.RdfIO.writeValue;
+
+/**
+ * Class ShardSubjectLookupStatementIterator
+ * Date: Jul 18, 2011
+ * Time: 10:53:55 AM
+ */
+public class ShardSubjectLookupStatementIterator implements
+        CloseableIteration<BindingSet, QueryEvaluationException> {
+
+    private Connector connector;
+    private String table;
+    //MMRTS-148
+    private String shardTable;
+    private ShardSubjectLookup lookup;
+    private DateHashModShardValueGenerator generator;
+    private BatchScanner scanner;
+    private BindingSet bindings;
+    private CloseableIteration<BindingSet, QueryEvaluationException> iter;
+    private Configuration configuration;
+//    private TimeType timeType = TimeType.XMLDATETIME;
+    private Authorizations authorizations = ALL_AUTHORIZATIONS;
+
+    private int numThreads;
+
+    public ShardSubjectLookupStatementIterator(PartitionSail psail, 
ShardSubjectLookup lookup, BindingSet bindings, Configuration configuration) 
throws QueryEvaluationException {
+        this.connector = psail.getConnector();
+        this.lookup = lookup;
+        this.table = psail.getTable();
+        this.shardTable = psail.getShardTable();
+        this.bindings = bindings;
+        this.configuration = configuration;
+
+        //Time Type check
+//        timeType = TimeType.valueOf(this.configuration.get(TIME_TYPE_PROP, 
TimeType.XMLDATETIME.name()));
+
+        //authorizations
+        String auths = this.configuration.get(AUTHORIZATION_PROP);
+        if (auths != null) {
+            authorizations = new Authorizations(auths.split(","));
+        }
+
+        //TODO: for now we need this
+        this.generator = (DateHashModShardValueGenerator) psail.getGenerator();
+
+        this.numThreads = this.configuration.getInt(NUMTHREADS_PROP, 
generator.getBaseMod());
+
+        this.initialize();
+    }
+
+    public void initialize() throws QueryEvaluationException {
+        try {
+            /**
+             * Here we will set up the BatchScanner based on the lookup
+             */
+            Var subject = lookup.getSubject();
+            List<Map.Entry<Var, Var>> where = retrieveWhereClause();
+            List<Map.Entry<Var, Var>> select = retrieveSelectClause();
+
+            //global start-end time
+            long start = configuration.getLong(START_BINDING, 0);
+            long end = configuration.getLong(END_BINDING, 
System.currentTimeMillis());
+
+            int whereSize = where.size() + select.size() + 
((!isTimeRange(lookup, configuration)) ? 0 : 1);
+
+            if (subject.hasValue()
+                    && where.size() == 0  /* Not using whereSize, because we 
can set up the TimeRange in the scanner */
+                    && select.size() == 0) {
+                /**
+                 * Case 1: Subject is set, but predicate, object are not.
+                 * Return all for the subject
+                 */
+                this.scanner = scannerForSubject(subject.getValue());
+                if (this.scanner == null) {
+                    this.iter = new EmptyIteration();
+                    return;
+                }
+                Map.Entry<Var, Var> predObj = 
lookup.getPredicateObjectPairs().get(0);
+                this.iter = new SelectAllIterator(this.bindings, 
this.scanner.iterator(), predObj.getKey(), predObj.getValue());
+            } else if (subject.hasValue()
+                    && where.size() == 0 /* Not using whereSize, because we 
can set up the TimeRange in the scanner */) {
+                /**
+                 * Case 2: Subject is set, and a few predicates are set, but 
no objects
+                 * Return all, and filter which predicates you are interested 
in
+                 */
+                this.scanner = scannerForSubject(subject.getValue());
+                if (this.scanner == null) {
+                    this.iter = new EmptyIteration();
+                    return;
+                }
+                this.iter = new FilterIterator(this.bindings, 
this.scanner.iterator(), subject, select);
+            } else if (subject.hasValue()
+                    && where.size() >= 1 /* Not using whereSize, because we 
can set up the TimeRange in the scanner */) {
+                /**
+                 * Case 2a: Subject is set, and a few predicates are set, and 
one object
+                 * TODO: For now we will ignore the predicate-object filter 
because we do not know how to query for this
+                 */
+                this.scanner = scannerForSubject(subject.getValue());
+                if (this.scanner == null) {
+                    this.iter = new EmptyIteration();
+                    return;
+                }
+                this.iter = new FilterIterator(this.bindings, 
this.scanner.iterator(), subject, select);
+            } else if (!subject.hasValue() && whereSize > 1) {
+                /**
+                 * Case 3: Subject is not set, more than one where clause
+                 */
+                this.scanner = scannerForPredicateObject(lookup, start, end, 
where, select);
+                if (this.scanner == null) {
+                    this.iter = new EmptyIteration();
+                    return;
+                }
+                this.iter = new FilterIterator(this.bindings, 
this.scanner.iterator(), subject, select);
+//                this.iter = new SubjectSelectIterator(this.bindings, 
this.scanner.iterator(), subject, select);
+            } else if (!subject.hasValue() && whereSize == 1 && select.size() 
== 0) {
+                /**
+                 * Case 4: No subject, only one where clause
+                 */
+                Map.Entry<Var, Var> predObj = null;
+                if (where.size() == 1) {
+                    predObj = where.get(0);
+                }
+                this.scanner = scannerForPredicateObject(lookup, start, end, 
predObj);
+                if (this.scanner == null) {
+                    this.iter = new EmptyIteration();
+                    return;
+                }
+                this.iter = new FilterIterator(this.bindings, 
this.scanner.iterator(), subject, select);
+//                this.iter = new SubjectSelectIterator(this.bindings, 
this.scanner.iterator(), subject, select);
+            } else if (!subject.hasValue() && select.size() > 1) {
+
+                /**
+                 * Case 5: No subject, no where (multiple select)
+                 */
+                this.scanner = scannerForPredicates(start, end, select);
+                if (this.scanner == null) {
+                    this.iter = new EmptyIteration();
+                    return;
+                }
+                this.iter = new FilterIterator(this.bindings, 
this.scanner.iterator(), subject, select);
+            } else if (!subject.hasValue() && select.size() == 1) {
+                /**
+                 * Case 5: No subject, no where (just 1 select)
+                 */
+                cloudbase.core.client.Scanner sc = scannerForPredicate(lookup, 
start, end, (URI) select.get(0).getKey().getValue());
+                if (sc == null) {
+                    this.iter = new EmptyIteration();
+                    return;
+                }                                             //TODO: Fix, put 
in concrete class
+                final Iterator<Map.Entry<Key, Value>> scIter = sc.iterator();
+                this.iter = new FilterIterator(this.bindings, scIter, subject, 
select);
+            } else {
+                throw new QueryEvaluationException("Case not supported as of 
yet");
+            }
+
+        } catch (Exception e) {
+            throw new QueryEvaluationException(e);
+        }
+    }
+
+    protected List<Map.Entry<Var, Var>> retrieveWhereClause() {
+        List<Map.Entry<Var, Var>> where = new ArrayList<Map.Entry<Var, Var>>();
+        for (Map.Entry<Var, Var> entry : lookup.getPredicateObjectPairs()) {
+            Var pred = entry.getKey();
+            Var object = entry.getValue();
+            if (pred.hasValue() && object.hasValue()) {
+                where.add(entry); //TODO: maybe we should clone this?
+            }
+        }
+        return where;
+    }
+
+    protected List<Map.Entry<Var, Var>> retrieveSelectClause() {
+        List<Map.Entry<Var, Var>> select = new ArrayList<Map.Entry<Var, 
Var>>();
+        for (Map.Entry<Var, Var> entry : lookup.getPredicateObjectPairs()) {
+            Var pred = entry.getKey();
+            Var object = entry.getValue();
+            if (pred.hasValue() && !object.hasValue()) {
+                select.add(entry); //TODO: maybe we should clone this?
+            }
+        }
+        return select;
+    }
+
+    @Override
+    public void close() throws QueryEvaluationException {
+        if (this.scanner != null) {
+            this.scanner.close();
+        }
+    }
+
+    @Override
+    public boolean hasNext() throws QueryEvaluationException {
+        return iter.hasNext();
+    }
+
+    @Override
+    public BindingSet next() throws QueryEvaluationException {
+        try {
+            return iter.next();
+        } catch (Exception e) {
+            throw new QueryEvaluationException(e);
+        }
+    }
+
+    @Override
+    public void remove() throws QueryEvaluationException {
+        iter.next();
+    }
+
+    /**
+     * Utility methods to set up the scanner/batch scanner
+     */
+
+    protected List<Text> shardForSubject(org.openrdf.model.Value subject) 
throws TableNotFoundException, IOException {
+        BatchScanner scanner = createBatchScanner(this.shardTable);
+        try {
+            scanner.setRanges(Collections.singleton(
+                    new Range(new Text(writeValue(subject)))
+            ));
+            Iterator<Map.Entry<Key, Value>> shardIter = scanner.iterator();
+            if (!shardIter.hasNext()) {
+                return null;
+            }
+
+            List<Text> shards = new ArrayList<Text>();
+            while (shardIter.hasNext()) {
+                shards.add(shardIter.next().getKey().getColumnFamily());
+            }
+            //MMRTS-147 so that we can return subjects from multiple shards
+            return shards;
+        } finally {
+            if (scanner != null)
+                scanner.close();
+        }
+    }
+
+
+    protected BatchScanner scannerForSubject(org.openrdf.model.Value subject) 
throws TableNotFoundException, IOException {
+        List<Text> shards = shardForSubject(subject);
+
+        if (shards == null)
+            return null;
+
+        BatchScanner scanner = createBatchScanner(this.table);
+
+//        scanner.setScanIterators(21, 
CellLevelRecordIterator.class.getName(), "ci");
+        Collection<Range> ranges = new ArrayList<Range>();
+        for (Text shard : shards) {
+            ranges.add(new Range(
+                    new Key(
+                            shard, DOC,
+                            new Text(URI_MARKER_STR + subject + 
FAMILY_DELIM_STR + "\0")
+                    ),
+                    new Key(
+                            shard, DOC,
+                            new Text(URI_MARKER_STR + subject + 
FAMILY_DELIM_STR + "\uFFFD")
+                    )
+            ));
+        }
+        scanner.setRanges(ranges);
+        return scanner;
+    }
+
+    protected BatchScanner scannerForPredicateObject(ShardSubjectLookup 
lookup, Long start, Long end, List<Map.Entry<Var, Var>> predObjs, 
List<Map.Entry<Var, Var>> select) throws IOException, TableNotFoundException {
+        start = validateFillStartTime(start, lookup);
+        end = validateFillEndTime(end, lookup);
+
+        int extra = 0;
+
+        if (isTimeRange(lookup, configuration)) {
+            extra += 1;
+        }
+
+        Text[] queries = new Text[predObjs.size() + select.size() + extra];
+        int qi = 0;
+        for (Map.Entry<Var, Var> predObj : predObjs) {
+            ByteArrayDataOutput output = ByteStreams.newDataOutput();
+            writeValue(output, predObj.getKey().getValue());
+            output.write(INDEX_DELIM);
+            writeValue(output, predObj.getValue().getValue());
+            queries[qi++] = new Text(output.toByteArray());
+        }
+        for (Map.Entry<Var, Var> predicate : select) {
+            queries[qi++] = new 
Text(GMDenIntersectingIterator.getRangeTerm(INDEX.toString(),
+                    URI_MARKER_STR + predicate.getKey().getValue() + 
INDEX_DELIM_STR + "\0"
+                    , true,
+                    URI_MARKER_STR + predicate.getKey().getValue() + 
INDEX_DELIM_STR + "\uFFFD",
+                    true
+            ));
+        }
+
+        if (isTimeRange(lookup, configuration)) {
+            queries[queries.length - 1] = new Text(
+                    GMDenIntersectingIterator.getRangeTerm(INDEX.toString(),
+                            getStartTimeRange(lookup, configuration)
+                            , true,
+                            getEndTimeRange(lookup, configuration),
+                            true
+                    )
+            );
+        }
+
+        BatchScanner bs = createBatchScanner(this.table);
+
+        bs.setScanIterators(21, CellLevelRecordIterator.class.getName(), "ci");
+        bs.setScanIteratorOption("ci", CBConverter.OPTION_VALUE_DELIMITER, 
VALUE_DELIMITER);
+
+        bs.setScanIterators(20, GMDenIntersectingIterator.class.getName(), 
"ii");
+        bs.setScanIteratorOption("ii", 
GMDenIntersectingIterator.docFamilyOptionName, DOC.toString());
+        bs.setScanIteratorOption("ii", 
GMDenIntersectingIterator.indexFamilyOptionName, INDEX.toString());
+        bs.setScanIteratorOption("ii", 
GMDenIntersectingIterator.columnFamiliesOptionName, 
GMDenIntersectingIterator.encodeColumns(queries));
+        bs.setScanIteratorOption("ii", 
GMDenIntersectingIterator.OPTION_MULTI_DOC, "" + true);
+
+        Range range = new Range(
+                new Key(new Text(generator.generateShardValue(start, null) + 
"\0")),
+                new Key(new Text(generator.generateShardValue(end, null) + 
"\uFFFD"))
+        );
+        bs.setRanges(Collections.singleton(
+                range
+        ));
+
+        return bs;
+    }
+
+    protected BatchScanner scannerForPredicateObject(ShardSubjectLookup 
lookup, Long start, Long end, Map.Entry<Var, Var> predObj) throws IOException, 
TableNotFoundException {
+        start = validateFillStartTime(start, lookup);
+        end = validateFillEndTime(end, lookup);
+
+        BatchScanner bs = createBatchScanner(this.table);
+
+        bs.setScanIterators(21, CellLevelRecordIterator.class.getName(), "ci");
+        bs.setScanIteratorOption("ci", CBConverter.OPTION_VALUE_DELIMITER, 
VALUE_DELIMITER);
+
+        bs.setScanIterators(20, SortedRangeIterator.class.getName(), "ri");
+        bs.setScanIteratorOption("ri", SortedRangeIterator.OPTION_DOC_COLF, 
DOC.toString());
+        bs.setScanIteratorOption("ri", SortedRangeIterator.OPTION_COLF, 
INDEX.toString());
+        bs.setScanIteratorOption("ri", 
SortedRangeIterator.OPTION_START_INCLUSIVE, "" + true);
+        bs.setScanIteratorOption("ri", 
SortedRangeIterator.OPTION_END_INCLUSIVE, "" + true);
+        bs.setScanIteratorOption("ri", SortedRangeIterator.OPTION_MULTI_DOC, 
"" + true);
+
+        if (isTimeRange(lookup, configuration)) {
+            String startRange = getStartTimeRange(lookup, configuration);
+            bs.setScanIteratorOption("ri", 
SortedRangeIterator.OPTION_LOWER_BOUND,
+                    startRange);
+            String endRange = getEndTimeRange(lookup, configuration);
+            bs.setScanIteratorOption("ri", 
SortedRangeIterator.OPTION_UPPER_BOUND,
+                    endRange);
+        } else {
+
+            ByteArrayDataOutput output = ByteStreams.newDataOutput();
+            writeValue(output, predObj.getKey().getValue());
+            output.write(INDEX_DELIM);
+            writeValue(output, predObj.getValue().getValue());
+
+            String bound = new String(output.toByteArray());
+            bs.setScanIteratorOption("ri", 
SortedRangeIterator.OPTION_LOWER_BOUND, bound);
+            bs.setScanIteratorOption("ri", 
SortedRangeIterator.OPTION_UPPER_BOUND, bound + "\00");
+        }
+
+        //TODO: Do we add a time predicate to this?
+//        bs.setScanIterators(19, FilteringIterator.class.getName(), 
"filteringIterator");
+//        bs.setScanIteratorOption("filteringIterator", "0", 
TimeRangeFilter.class.getName());
+//        bs.setScanIteratorOption("filteringIterator", "0." + 
TimeRangeFilter.TIME_RANGE_PROP, (end - start) + "");
+//        bs.setScanIteratorOption("filteringIterator", "0." + 
TimeRangeFilter.START_TIME_PROP, end + "");
+
+        Range range = new Range(
+                new Key(new Text(generator.generateShardValue(start, null) + 
"\0")),
+                new Key(new Text(generator.generateShardValue(end, null) + 
"\uFFFD"))
+        );
+        bs.setRanges(Collections.singleton(
+                range
+        ));
+
+        return bs;
+    }
+
+    protected BatchScanner scannerForPredicates(Long start, Long end, 
List<Map.Entry<Var, Var>> predicates) throws IOException, 
TableNotFoundException {
+        start = validateFillStartTime(start, lookup);
+        end = validateFillEndTime(end, lookup);
+
+        int extra = 0;
+
+        if (isTimeRange(lookup, configuration)) {
+            extra += 1;
+        }
+
+        Text[] queries = new Text[predicates.size() + extra];
+        for (int i = 0; i < predicates.size(); i++) {
+            Map.Entry<Var, Var> predicate = predicates.get(i);
+            queries[i] = new 
Text(GMDenIntersectingIterator.getRangeTerm(INDEX.toString(),
+                    URI_MARKER_STR + predicate.getKey().getValue() + 
INDEX_DELIM_STR + "\0"
+                    , true,
+                    URI_MARKER_STR + predicate.getKey().getValue() + 
INDEX_DELIM_STR + "\uFFFD",
+                    true
+            ));
+        }
+
+        if (isTimeRange(lookup, configuration)) {
+            queries[queries.length - 1] = new Text(
+                    GMDenIntersectingIterator.getRangeTerm(INDEX.toString(),
+                            getStartTimeRange(lookup, configuration)
+                            , true,
+                            getEndTimeRange(lookup, configuration),
+                            true
+                    )
+            );
+        }
+
+        BatchScanner bs = createBatchScanner(this.table);
+        bs.setScanIterators(21, CellLevelRecordIterator.class.getName(), "ci");
+        bs.setScanIteratorOption("ci", CBConverter.OPTION_VALUE_DELIMITER, 
VALUE_DELIMITER);
+
+        bs.setScanIterators(20, GMDenIntersectingIterator.class.getName(), 
"ii");
+        bs.setScanIteratorOption("ii", 
GMDenIntersectingIterator.docFamilyOptionName, DOC.toString());
+        bs.setScanIteratorOption("ii", 
GMDenIntersectingIterator.indexFamilyOptionName, INDEX.toString());
+        bs.setScanIteratorOption("ii", 
GMDenIntersectingIterator.columnFamiliesOptionName, 
GMDenIntersectingIterator.encodeColumns(queries));
+        bs.setScanIteratorOption("ii", 
GMDenIntersectingIterator.OPTION_MULTI_DOC, "" + true);
+
+        Range range = new Range(
+                new Key(new Text(generator.generateShardValue(start, null) + 
"\0")),
+                new Key(new Text(generator.generateShardValue(end, null) + 
"\uFFFD"))
+        );
+        bs.setRanges(Collections.singleton(
+                range
+        ));
+
+        return bs;
+    }
+
+    protected cloudbase.core.client.Scanner 
scannerForPredicate(ShardSubjectLookup lookup, Long start, Long end, URI 
predicate) throws IOException, TableNotFoundException {
+        start = validateFillStartTime(start, lookup);
+        end = validateFillEndTime(end, lookup);
+
+        cloudbase.core.client.Scanner sc = createScanner(this.table);
+
+        Range range = new Range(
+                new Key(new Text(generator.generateShardValue(start, null) + 
"\0")),
+                new Key(new Text(generator.generateShardValue(end, null) + 
"\uFFFD"))
+        );
+        sc.setRange(range);
+        sc.fetchColumnFamily(INDEX);
+        sc.setColumnFamilyRegex(INDEX.toString());
+        sc.setColumnQualifierRegex(URI_MARKER_STR + predicate + 
INDEX_DELIM_STR + "(.*)");
+
+        return sc;
+    }
+
+    protected cloudbase.core.client.Scanner createScanner(String sTable) 
throws TableNotFoundException {
+        return connector.createScanner(sTable, authorizations);
+    }
+
+    protected BatchScanner createBatchScanner(String sTable) throws 
TableNotFoundException {
+        return createBatchScanner(sTable, numThreads);
+    }
+
+    protected BatchScanner createBatchScanner(String sTable, int numThreads) 
throws TableNotFoundException {
+        return connector.createBatchScanner(sTable, authorizations, 
numThreads);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/SubjectGroupingOptimizer.java
----------------------------------------------------------------------
diff --git 
a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/SubjectGroupingOptimizer.java
 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/SubjectGroupingOptimizer.java
new file mode 100644
index 0000000..782cfb9
--- /dev/null
+++ 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/SubjectGroupingOptimizer.java
@@ -0,0 +1,178 @@
+package mvm.mmrts.rdf.partition.query.evaluation;
+
+import mvm.mmrts.rdf.partition.query.operators.ShardSubjectLookup;
+import mvm.mmrts.rdf.partition.utils.CountPredObjPairs;
+import org.apache.hadoop.conf.Configuration;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.Dataset;
+import org.openrdf.query.algebra.Join;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.Var;
+import org.openrdf.query.algebra.evaluation.QueryOptimizer;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import static mvm.mmrts.rdf.partition.PartitionConstants.*;
+
+/**
+ * Date: Jul 14, 2011
+ * Time: 4:14:16 PM
+ */
+public class SubjectGroupingOptimizer implements QueryOptimizer {
+
+    private static final Comparator<Var> VAR_COMPARATOR = new VarComparator();
+    private static final Comparator<StatementPattern> SP_SUBJ_COMPARATOR = new 
SubjectComparator();
+    private static final Comparator<TupleExpr> STATS_SHARD_COMPARATOR = new 
ShardLookupComparator();
+    private static final CountPredObjPairs STATISTICS = new 
CountPredObjPairs();
+    private Configuration conf;
+
+    public SubjectGroupingOptimizer(Configuration conf) {
+        this.conf = conf;
+    }
+
+    @Override
+    public void optimize(TupleExpr tupleExpr, Dataset dataset, BindingSet 
bindingSet) {
+        tupleExpr.visit(new FlattenJoinVisitor());
+    }
+
+    protected class FlattenJoinVisitor extends 
QueryModelVisitorBase<RuntimeException> {
+        @Override
+        public void meet(Join node) throws RuntimeException {
+            List<StatementPattern> flatten = getJoinArgs(node, new 
ArrayList<StatementPattern>());
+            //order by subject
+            Collections.sort(flatten, SP_SUBJ_COMPARATOR);
+
+            List<TupleExpr> shardLookups = new ArrayList<TupleExpr>();
+            Var current = null;
+            ShardSubjectLookup shardLookupCurrent = null;
+            for (StatementPattern sp : flatten) {
+                if (!sp.getSubjectVar().hasValue() && 
!sp.getPredicateVar().hasValue()) {
+                    // if there is nothing set in the subject or predicate, we 
treat it as a single item
+                    // might be ?s ?p ?o
+                    shardLookups.add(sp);
+                } else {
+                    Var subjectVar = sp.getSubjectVar();
+                    if (VAR_COMPARATOR.compare(current, subjectVar) != 0) {
+                        current = subjectVar;
+                        shardLookupCurrent = new ShardSubjectLookup(current);
+                        populateLookup(shardLookupCurrent);
+                        shardLookups.add(shardLookupCurrent);
+                    }
+                    
shardLookupCurrent.addPredicateObjectPair(sp.getPredicateVar(), 
sp.getObjectVar());
+                }
+            }
+
+            int i = 0;
+            Collections.sort(shardLookups, STATS_SHARD_COMPARATOR);
+            TupleExpr replacement = shardLookups.get(i);
+            for (i++; i < shardLookups.size(); i++) {
+                replacement = new Join(replacement, shardLookups.get(i));
+            }
+
+            node.replaceWith(replacement);
+        }
+
+        @Override
+        public void meet(StatementPattern node) throws RuntimeException {
+            ShardSubjectLookup lookup = new 
ShardSubjectLookup(node.getSubjectVar());
+            lookup.addPredicateObjectPair(node.getPredicateVar(), 
node.getObjectVar());
+            populateLookup(lookup);
+            node.replaceWith(lookup);
+        }
+    }
+
+    protected <L extends List<StatementPattern>> L getJoinArgs(TupleExpr 
tupleExpr, L joinArgs) {
+        if (tupleExpr instanceof Join) {
+            Join join = (Join) tupleExpr;
+            getJoinArgs(join.getLeftArg(), joinArgs);
+            getJoinArgs(join.getRightArg(), joinArgs);
+        } else if (tupleExpr instanceof StatementPattern) {
+            joinArgs.add((StatementPattern) tupleExpr);
+        }
+
+        return joinArgs;
+    }
+
+    protected ShardSubjectLookup populateLookup(ShardSubjectLookup lookup) {
+        String timePredicate = conf.get(lookup.getSubject().getName() + "." + 
TIME_PREDICATE);
+        if (timePredicate != null) {
+            lookup.setTimePredicate(timePredicate);
+            lookup.setStartTimeRange(conf.get(lookup.getSubject().getName() + 
"." + START_BINDING));
+            lookup.setEndTimeRange(conf.get(lookup.getSubject().getName() + 
"." + END_BINDING));
+            
lookup.setTimeType(TimeType.valueOf(conf.get(lookup.getSubject().getName() + 
"." + TIME_TYPE_PROP, TimeType.XMLDATETIME.name())));
+        }
+
+        String shardRange = conf.get(lookup.getSubject().getName() + "." + 
SHARDRANGE_BINDING);
+        if(shardRange != null) {
+            
lookup.setShardStartTimeRange(conf.get(lookup.getSubject().getName() + "." + 
SHARDRANGE_START));
+            lookup.setShardEndTimeRange(conf.get(lookup.getSubject().getName() 
+ "." + SHARDRANGE_END));
+        }
+
+        return lookup;
+    }
+
+    protected static class SubjectComparator implements 
Comparator<StatementPattern> {
+
+        @Override
+        public int compare(StatementPattern a, StatementPattern b) {
+            if (a == b)
+                return 0;
+
+            if (a == null || b == null)
+                return 1;
+
+            if (a.getSubjectVar().equals(b.getSubjectVar())) {
+                if (a.getPredicateVar().hasValue() && 
b.getPredicateVar().hasValue())
+                    return 0;
+                if (a.getPredicateVar().hasValue() && 
!b.getPredicateVar().hasValue())
+                    return -1;
+                if (!a.getPredicateVar().hasValue() && 
b.getPredicateVar().hasValue())
+                    return 1;
+                return 0;
+            }
+
+            if (a.getSubjectVar().getValue() != null && 
b.getSubjectVar().getValue() != null &&
+                    
a.getSubjectVar().getValue().equals(b.getSubjectVar().getValue()))
+                return 0;
+
+            return 1;
+        }
+    }
+
+    protected static class ShardLookupComparator implements 
Comparator<TupleExpr> {
+
+        @Override
+        public int compare(TupleExpr a, TupleExpr b) {
+            double a_c = STATISTICS.getCount(a);
+            double b_c = STATISTICS.getCount(b);
+            double diff = a_c - b_c;
+            return (int) (diff / Math.abs(diff));
+        }
+    }
+
+    protected static class VarComparator implements Comparator<Var> {
+
+        @Override
+        public int compare(Var a, Var b) {
+            if (a == b)
+                return 0;
+            if (a == null || b == null)
+                return 1;
+
+            if (a.equals(b))
+                return 0;
+
+            if (a.getValue() != null &&
+                    b.getValue() != null &&
+                    a.getValue().equals(b.getValue()))
+                return 0;
+
+            return 1;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/FilterIterator.java
----------------------------------------------------------------------
diff --git 
a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/FilterIterator.java
 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/FilterIterator.java
new file mode 100644
index 0000000..7da4276
--- /dev/null
+++ 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/FilterIterator.java
@@ -0,0 +1,100 @@
+package mvm.mmrts.rdf.partition.query.evaluation.select;
+
+import cloudbase.core.data.Key;
+import cloudbase.core.data.Value;
+import com.google.common.collect.Lists;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.Var;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+import java.util.*;
+
+/**
+ * TODO: This could be done as a filtering iterator in the Iterator Stack
+ */
+public class FilterIterator extends SelectIterator {
+
+    private List<Map.Entry<Var, Var>> predObjs;
+    private Map<URI, Map.Entry<Var, Var>> filters = new HashMap<URI, 
Map.Entry<Var, Var>>();
+    private List<Statement> document;
+    private List<Map.Entry<Var, Var>> currentPredObj;
+    private Var subjVar;
+    private List<QueryBindingSet> currentResults;
+    private int currentResultsIndex = 0;
+
+    public FilterIterator(BindingSet bindings, Iterator<Map.Entry<Key, Value>> 
iter, Var subjVar, List<Map.Entry<Var, Var>> predObjs) throws 
QueryEvaluationException {
+        super(bindings, iter);
+        this.subjVar = subjVar;
+        this.predObjs = predObjs;
+        for (Map.Entry<Var, Var> predObj : this.predObjs) {
+            //find filtering predicates
+            this.filters.put((URI) predObj.getKey().getValue(), predObj);
+        }
+    }
+
+    @Override
+    public boolean hasNext() throws QueryEvaluationException {
+        if (document != null || currentResults != null)
+            return true;
+
+        return super.hasNext();
+
+//        boolean hasNext = super.hasNext();
+//        List<Map.Entry<Var, Var>> filter = null;
+//        while (hasNext) {
+//            List<Statement> stmts = nextDocument();
+//            filter = filter(stmts);
+//            if (filter != null && filter.size() > 0) {
+//                document = stmts;
+//                this.currentPredObj = filter;
+//                return true;
+//            }
+//            hasNext = super.hasNext();
+//        }
+//        return document != null;
+    }
+
+    @Override
+    public BindingSet next() throws QueryEvaluationException {
+        try {
+            if (document == null) {
+                document = nextDocument();
+            }
+            if (currentResults == null) {
+                currentResults = populateBindingSet(document, subjVar, 
this.predObjs);
+            }
+            BindingSet bs = currentResults.get(currentResultsIndex);
+            currentResultsIndex++;
+            if (currentResultsIndex >= currentResults.size()) {
+                currentResults = null;
+                currentResultsIndex = 0;
+                document = null;
+            }
+            return bs;
+        } catch (Exception e) {
+            throw new QueryEvaluationException(e);
+        }
+    }
+
+    /**
+     * @return true if the Statement is filtered
+     * @throws QueryEvaluationException
+     */
+    protected List<Map.Entry<Var, Var>> filter(List<Statement> document) 
throws QueryEvaluationException {
+        List<Map.Entry<Var, Var>> foundIn = new ArrayList();
+
+        for (Statement st : document) {
+            for (Map.Entry<Var, Var> entry : this.predObjs) {
+                if (st.getPredicate().equals(entry.getKey().getValue())) {
+                    foundIn.add(entry);
+                    break;
+                }
+            }
+        }
+        return foundIn;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/SelectAllIterator.java
----------------------------------------------------------------------
diff --git 
a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/SelectAllIterator.java
 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/SelectAllIterator.java
new file mode 100644
index 0000000..ebe23dc
--- /dev/null
+++ 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/SelectAllIterator.java
@@ -0,0 +1,54 @@
+package mvm.mmrts.rdf.partition.query.evaluation.select;
+
+import cloudbase.core.data.Key;
+import cloudbase.core.data.Value;
+import com.google.common.collect.Lists;
+import org.openrdf.model.Statement;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.Var;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Class SelectAllIterator
+ * Date: Jul 18, 2011
+ * Time: 12:01:25 PM
+ */
+public class SelectAllIterator extends SelectIterator {
+
+    private List<Map.Entry<Var, Var>> predObj;
+    private List<Statement> document = null;
+    private int index = 0;
+
+    public SelectAllIterator(BindingSet bindings, Iterator<Map.Entry<Key, 
Value>> iter, Var predVar, Var objVar) throws QueryEvaluationException {
+        super(bindings, iter);
+        predObj = (List) Lists.newArrayList(new HashMap.SimpleEntry(predVar, 
objVar));
+    }
+
+    @Override
+    public boolean hasNext() throws QueryEvaluationException {
+        return super.hasNext() || document != null;
+    }
+
+    @Override
+    public BindingSet next() throws QueryEvaluationException {
+        try {
+            if (document == null && super.hasNext()) {
+                document = nextDocument();
+            }
+            Statement st = document.get(index);
+            index++;
+            if (index >= document.size()) {
+                document = null;
+            }
+            return populateBindingSet(st, predObj);
+        } catch (Exception e) {
+            throw new QueryEvaluationException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/SelectIterator.java
----------------------------------------------------------------------
diff --git 
a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/SelectIterator.java
 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/SelectIterator.java
new file mode 100644
index 0000000..e6efa2b
--- /dev/null
+++ 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/SelectIterator.java
@@ -0,0 +1,270 @@
+package mvm.mmrts.rdf.partition.query.evaluation.select;
+
+import cloudbase.core.data.Key;
+import cloudbase.core.data.Value;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.io.ByteStreams;
+import info.aduna.iteration.CloseableIteration;
+import mvm.mmrts.rdf.partition.utils.RdfIO;
+import org.openrdf.model.Resource;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.model.impl.StatementImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.Var;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import ss.cloudbase.core.iterators.filter.CBConverter;
+
+import java.util.*;
+
+import static mvm.mmrts.rdf.partition.PartitionConstants.*;
+import static mvm.mmrts.rdf.partition.utils.RdfIO.readStatement;
+
+/**
+ * Class SelectAllIterator
+ * Date: Jul 18, 2011
+ * Time: 12:01:25 PM
+ */
+public abstract class SelectIterator implements CloseableIteration<BindingSet, 
QueryEvaluationException> {
+
+    protected PeekingIterator<Map.Entry<Key, Value>> iter;
+    protected BindingSet bindings;
+    protected CBConverter converter = new CBConverter();
+
+    private boolean hasNext = true;
+
+    public SelectIterator(BindingSet bindings, Iterator<Map.Entry<Key, Value>> 
iter) {
+        this.bindings = bindings;
+        this.iter = Iterators.peekingIterator(iter);
+        
converter.init(Collections.singletonMap(CBConverter.OPTION_VALUE_DELIMITER, 
VALUE_DELIMITER));
+    }
+
+    @Override
+    public void close() throws QueryEvaluationException {
+
+    }
+
+    @Override
+    public boolean hasNext() throws QueryEvaluationException {
+        return statefulHasNext();
+    }
+
+    protected boolean statefulHasNext() {
+        hasNext = iter.hasNext() && hasNext;
+        return hasNext;
+    }
+
+    protected List<Statement> nextDocument() throws QueryEvaluationException {
+        try {
+            Map.Entry<Key, Value> entry = iter.peek();
+            Key key = entry.getKey();
+            Value value = entry.getValue();
+
+            if (value.getSize() == 0) {
+                //not an aggregate document
+                return nextNonAggregateDocument();
+//                return 
Collections.singletonList(RdfIO.readStatement(ByteStreams.newDataInput(key.getColumnQualifier().getBytes()),
 VALUE_FACTORY, true));
+            }
+
+            List<Statement> document = new ArrayList<Statement>();
+
+            org.openrdf.model.Value subj = 
RdfIO.readValue(ByteStreams.newDataInput(key.getColumnQualifier().getBytes()), 
VALUE_FACTORY, FAMILY_DELIM);
+            Map<String, String> map = converter.toMap(entry.getKey(), value);
+            for (Map.Entry<String, String> e : map.entrySet()) {
+                String predObj = e.getKey();
+                String[] split = predObj.split(FAMILY_DELIM_STR);
+                document.add(new StatementImpl((Resource) subj, 
VALUE_FACTORY.createURI(split[0]), 
RdfIO.readValue(ByteStreams.newDataInput(split[1].getBytes()), VALUE_FACTORY, 
FAMILY_DELIM)));
+            }
+            iter.next();
+            return document;
+        } catch (Exception e) {
+            throw new QueryEvaluationException("Error retrieving document", e);
+        }
+    }
+
+//    protected List<Statement> nextDocument() throws QueryEvaluationException 
{
+//        try {
+//            List<? extends Map.Entry<Key, Value>> entryList = iter.next();
+//            List<Statement> document = new ArrayList();
+//            for (Map.Entry<Key, Value> keyValueEntry : entryList) {
+//                Statement stmt = null;
+//                Key key = keyValueEntry.getKey();
+//                if (DOC.equals(key.getColumnFamily()))
+//                    stmt = 
readStatement(ByteStreams.newDataInput(key.getColumnQualifier().getBytes()), 
VALUE_FACTORY);
+//                else
+//                    stmt = 
readStatement(ByteStreams.newDataInput(key.getColumnQualifier().getBytes()), 
VALUE_FACTORY, false);
+//                document.add(stmt);
+//            }
+//            return document;
+//        } catch (Exception e) {
+//            throw new QueryEvaluationException(e);
+//        }
+//    }
+
+//    protected List<Statement> nextDocument() throws QueryEvaluationException 
{
+//        return documentIter.next();
+//    }
+
+    protected List<Statement> nextNonAggregateDocument() throws 
QueryEvaluationException {
+        try {
+            List<Statement> document = new ArrayList<Statement>();
+            if (!statefulHasNext())
+                return document;
+            Statement stmt = peekNextStatement();
+            if (stmt == null)
+                return document;
+
+            Resource subject = stmt.getSubject();
+            Resource current = subject;
+            document.add(stmt);
+            while ((current.equals(subject) && statefulHasNext())) {
+                advance();
+                current = subject;
+                stmt = peekNextStatement();
+                if (stmt != null) {
+                    subject = stmt.getSubject();
+                    if (subject.equals(current))
+                        document.add(stmt);
+                } else
+                    subject = null;
+            }
+//            System.out.println(document);
+            return document;
+        } catch (Exception e) {
+            throw new QueryEvaluationException(e);
+        }
+    }
+
+//    protected Statement nextStatement() throws Exception {
+//        List<Map.Entry<Key, Value>> entryList = iter.next();
+//        for (Map.Entry<Key, Value> keyValueEntry : entryList) {
+//
+//        }
+//        Map.Entry<Key, Value> entry = iter.next();
+//        Key key = entry.getKey();
+//        if (DOC.equals(key.getColumnFamily()))
+//            return 
readStatement(ByteStreams.newDataInput(key.getColumnQualifier().getBytes()), 
VALUE_FACTORY);
+//        else
+//            return 
readStatement(ByteStreams.newDataInput(key.getColumnQualifier().getBytes()), 
VALUE_FACTORY, false);
+//    }
+
+    protected Statement peekNextStatement() throws Exception {
+        if (!statefulHasNext())
+            return null;
+        Map.Entry<Key, Value> entry = iter.peek();
+        Key key = entry.getKey();
+        if (DOC.equals(key.getColumnFamily()))
+            return 
readStatement(ByteStreams.newDataInput(key.getColumnQualifier().getBytes()), 
VALUE_FACTORY);
+        else
+            return 
readStatement(ByteStreams.newDataInput(key.getColumnQualifier().getBytes()), 
VALUE_FACTORY, false);
+    }
+
+    protected void advance() throws Exception {
+        iter.next();
+    }
+
+    @Override
+    public void remove() throws QueryEvaluationException {
+        iter.next();
+    }
+
+    protected BindingSet populateBindingSet(Statement st, List<Map.Entry<Var, 
Var>> predObjVar) {
+        QueryBindingSet result = new QueryBindingSet(bindings);
+        for (Map.Entry<Var, Var> entry : predObjVar) {
+            Var predVar = entry.getKey();
+            Var objVar = entry.getValue();
+            if (predVar != null && !result.hasBinding(predVar.getName()))
+                result.addBinding(predVar.getName(), st.getPredicate());
+            if (objVar != null && !result.hasBinding(objVar.getName()))
+                result.addBinding(objVar.getName(), st.getObject());
+        }
+        return result;
+    }
+
+    protected List<QueryBindingSet> populateBindingSet(List<Statement> 
document, Var subjVar, List<Map.Entry<Var, Var>> predObjVar) {
+        //convert document to a multimap
+        Multimap<URI, Statement> docMap = ArrayListMultimap.create();
+        for (Statement st : document) {
+            docMap.put(st.getPredicate(), st);
+        }
+
+        List<QueryBindingSet> results = new ArrayList<QueryBindingSet>();
+        QueryBindingSet bs0 = new QueryBindingSet(bindings);
+//        QueryBindingSet result = new QueryBindingSet(bindings);
+
+        if (document.size() > 0) {
+            Statement stmt = document.get(0);
+            if (subjVar != null && !bs0.hasBinding(subjVar.getName())) {
+                bs0.addBinding(subjVar.getName(), stmt.getSubject());
+            }
+        }
+        results.add(bs0);
+
+//        for (Statement st : document) {
+        for (Map.Entry<Var, Var> entry : predObjVar) {
+            Var predVar = entry.getKey();
+            Var objVar = entry.getValue();
+
+//                if (predVar.hasValue() && 
!st.getPredicate().equals(predVar.getValue()))
+//                    continue;
+            if (predVar == null || !predVar.hasValue())
+                continue;
+            Collection<Statement> predSts = docMap.get((URI) 
predVar.getValue());
+
+//            if (predVar != null && !result.hasBinding(predVar.getName()))
+//                result.addBinding(predVar.getName(), st.getPredicate());
+//            if (objVar != null && !result.hasBinding(objVar.getName()))
+//                result.addBinding(objVar.getName(), st.getObject());
+
+            populateBindingSets(results, predVar, objVar, predSts);
+        }
+//        }
+        return results;
+    }
+
+    private void populateBindingSets(List<QueryBindingSet> results, Var 
predVar, Var objVar, Collection<Statement> stmts) {
+        if (predVar == null || objVar == null || stmts == null || stmts.size() 
== 0)
+            return;
+
+        List<QueryBindingSet> copyOf = new ArrayList<QueryBindingSet>(results);
+
+        int i = copyOf.size();
+        int j = 0;
+        for (Iterator<Statement> iter = stmts.iterator(); iter.hasNext();) {
+            Statement st = iter.next();
+            int k = 0;
+            for (QueryBindingSet result : results) {
+                if (!result.hasBinding(predVar.getName()) || k >= i) {
+                    String name = predVar.getName();
+                    org.openrdf.model.Value val = st.getPredicate();
+                    addBinding(result, name, val);
+                }
+                if (!result.hasBinding(objVar.getName()) || k >= i)
+                    addBinding(result, objVar.getName(), st.getObject());
+                k++;
+            }
+
+            i = copyOf.size() + j * copyOf.size();
+            j++;
+
+            if (iter.hasNext()) {
+                //copy results
+                for (QueryBindingSet copy : copyOf) {
+                    results.add(new QueryBindingSet(copy));
+                }
+            }
+
+        }
+    }
+
+    private void addBinding(QueryBindingSet result, String name, 
org.openrdf.model.Value val) {
+        if (result.hasBinding(name))
+            result.removeBinding(name);
+        result.addBinding(name, val);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/SubjectSelectIterator.java
----------------------------------------------------------------------
diff --git 
a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/SubjectSelectIterator.java
 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/SubjectSelectIterator.java
new file mode 100644
index 0000000..fe0fca2
--- /dev/null
+++ 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/SubjectSelectIterator.java
@@ -0,0 +1,40 @@
+//package mvm.mmrts.rdf.partition.query.evaluation.select;
+//
+//import cloudbase.core.data.Key;
+//import cloudbase.core.data.Value;
+//import org.openrdf.model.Statement;
+//import org.openrdf.query.BindingSet;
+//import org.openrdf.query.QueryEvaluationException;
+//import org.openrdf.query.algebra.Var;
+//import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+//
+//import java.util.Iterator;
+//import java.util.List;
+//import java.util.Map;
+//
+///**
+// * Class SubjectSelectIterator
+// * Date: Jul 18, 2011
+// * Time: 3:38:16 PM
+// */
+//public class SubjectSelectIterator extends SelectIterator {
+//
+//    private Var subjVar;
+//    private List<Map.Entry<Var, Var>> select;
+//
+//    public SubjectSelectIterator(BindingSet bindings, 
Iterator<Map.Entry<Key, Value>> iter, Var subjVar, List<Map.Entry<Var, Var>> 
select) {
+//        super(bindings, iter);
+//        this.subjVar = subjVar;
+//        this.select = select;
+//    }
+//
+//    @Override
+//    public BindingSet next() throws QueryEvaluationException {
+//        List<Statement> document = nextDocument();
+//        if(document.size() != 6) {
+//            System.out.println("here");
+//        }
+//        return populateBindingSet(document, subjVar, this.select);
+//
+//    }
+//}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/utils/DocumentIterator.java
----------------------------------------------------------------------
diff --git 
a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/utils/DocumentIterator.java
 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/utils/DocumentIterator.java
new file mode 100644
index 0000000..f1e6c74
--- /dev/null
+++ 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/utils/DocumentIterator.java
@@ -0,0 +1,107 @@
+package mvm.mmrts.rdf.partition.query.evaluation.select.utils;
+
+import cloudbase.core.data.Key;
+import cloudbase.core.data.Value;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.io.ByteStreams;
+import org.openrdf.model.Resource;
+import org.openrdf.model.Statement;
+
+import java.util.*;
+
+import static mvm.mmrts.rdf.partition.PartitionConstants.DOC;
+import static mvm.mmrts.rdf.partition.PartitionConstants.VALUE_FACTORY;
+import static mvm.mmrts.rdf.partition.utils.RdfIO.readStatement;
+
+/**
+ * This iterator will seek forward in the underlying BatchScanner Iterator and 
group
+ * statements with the same subject.  This guards against the fact that the 
BatchScanner can return
+ * statements out of order.
+ * <br/>
+ * TODO: Not the best solution.
+ * Class DocumentIterator
+ * Date: Aug 29, 2011
+ * Time: 4:09:16 PM
+ */
+public class DocumentIterator implements Iterator<List<Statement>> {
+
+    public static final int BATCH_SIZE = 1000;
+
+    private int batchSize = BATCH_SIZE; //will hold up to 100 subject documents
+    /**
+     * TODO: Check performance against other multi maps
+     */
+    private ListMultimap<Resource, Statement> documents = 
ArrayListMultimap.create();
+    //TODO: Hate having to keep track of this, expensive to constantly check 
the "contains"
+    /**
+     * We keep track of a queue of subjects, so that the first one in will 
most likely have all of its document
+     * in our batch before popping. This assumes also that the documents won't 
get larger than 1000 at the most.
+     */
+    private LinkedList<Resource> subjects = new LinkedList<Resource>();
+
+    private Iterator<Map.Entry<Key, Value>> iter;
+    private boolean hasNext = true;
+
+    public DocumentIterator(Iterator<Map.Entry<Key, Value>> iter) {
+        this(iter, BATCH_SIZE);
+    }
+
+    public DocumentIterator(Iterator<Map.Entry<Key, Value>> iter, int 
batchSize) {
+        this.iter = iter;
+        this.batchSize = batchSize;
+        fillDocumentMap();
+    }
+
+    protected void fillDocumentMap() {
+        try {
+            while ((documents.size() < batchSize) && statefulHasNext()) {
+                Statement stmt = nextStatement();
+                Resource subj = stmt.getSubject();
+                documents.put(subj, stmt);
+                if (!subjects.contains(subj))
+                    subjects.add(subj);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    protected boolean statefulHasNext() {
+        hasNext = iter.hasNext() && hasNext;
+        return hasNext;
+    }
+
+    protected Statement nextStatement() throws Exception {
+        Map.Entry<Key, Value> entry = iter.next();
+        Key key = entry.getKey();
+        if (DOC.equals(key.getColumnFamily()))
+            return 
readStatement(ByteStreams.newDataInput(key.getColumnQualifier().getBytes()), 
VALUE_FACTORY);
+        else
+            return 
readStatement(ByteStreams.newDataInput(key.getColumnQualifier().getBytes()), 
VALUE_FACTORY, false);
+    }
+
+    @Override
+    public boolean hasNext() {
+        fillDocumentMap();
+        return documents.size() > 0;
+    }
+
+    @Override
+    public List<Statement> next() {
+        fillDocumentMap();
+        if (subjects.size() > 0) {
+            Resource subject = subjects.pop();
+            subjects.remove(subject);
+            List<Statement> doc = documents.removeAll(subject);
+            System.out.println(doc);
+            return doc;
+        }
+        return null;
+    }
+
+    @Override
+    public void remove() {
+        this.next();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/operators/ShardSubjectLookup.java
----------------------------------------------------------------------
diff --git 
a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/operators/ShardSubjectLookup.java
 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/operators/ShardSubjectLookup.java
new file mode 100644
index 0000000..378606c
--- /dev/null
+++ 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/operators/ShardSubjectLookup.java
@@ -0,0 +1,167 @@
+package mvm.mmrts.rdf.partition.query.operators;
+
+import mvm.mmrts.rdf.partition.PartitionConstants;
+import org.openrdf.query.algebra.QueryModelNodeBase;
+import org.openrdf.query.algebra.QueryModelVisitor;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.Var;
+
+import java.util.*;
+
+/**
+ * Here the subject is not null, but there will be a list of
+ * predicate/object paired vars that may or may not be null
+ * <p/>
+ * Class ShardSubjectLookup
+ * Date: Jul 14, 2011
+ * Time: 3:32:33 PM
+ */
+public class ShardSubjectLookup extends QueryModelNodeBase implements 
TupleExpr {
+
+    private Var subject;
+    private List<Map.Entry<Var, Var>> predicateObjectPairs;
+
+    private String timePredicate;
+    private String startTimeRange;
+    private String endTimeRange;
+    private String shardStartTimeRange;
+    private String shardEndTimeRange;
+    private PartitionConstants.TimeType timeType;
+
+    public ShardSubjectLookup(Var subject) {
+        this(subject, new ArrayList<Map.Entry<Var, Var>>());
+    }
+
+    public ShardSubjectLookup(Var subject, List<Map.Entry<Var, Var>> 
predicateObjectPairs) {
+        this.subject = subject.clone();
+        this.predicateObjectPairs = new ArrayList<Map.Entry<Var, 
Var>>(predicateObjectPairs);
+    }
+
+    @Override
+    public <X extends Exception> void visit(QueryModelVisitor<X> visitor) 
throws X {
+        visitor.meetOther(this);
+    }
+
+    @Override
+    public <X extends Exception> void visitChildren(QueryModelVisitor<X> 
visitor) throws X {
+        visitor.meet(subject);
+        for (Map.Entry<Var, Var> predObj : predicateObjectPairs) {
+            visitor.meet(predObj.getKey());
+            visitor.meet(predObj.getValue());
+        }
+    }
+
+    @Override
+    public Set<String> getBindingNames() {
+        return getAssuredBindingNames();
+    }
+
+    @Override
+    public Set<String> getAssuredBindingNames() {
+        Set<String> bindingNames = new HashSet<String>(8);
+
+        if (subject != null) {
+            bindingNames.add(subject.getName());
+        }
+        for (Map.Entry<Var, Var> predObj : predicateObjectPairs) {
+            bindingNames.add(predObj.getKey().getName());
+            bindingNames.add(predObj.getValue().getName());
+        }
+
+        return bindingNames;
+    }
+
+    public void addPredicateObjectPair(Var predicate, Var object) {
+        this.predicateObjectPairs.add(new HashMap.SimpleEntry<Var, 
Var>(predicate, object));
+    }
+
+    public Var getSubject() {
+        return subject;
+    }
+
+    public void setSubject(Var subject) {
+        this.subject = subject;
+    }
+
+    public List<Map.Entry<Var, Var>> getPredicateObjectPairs() {
+        return predicateObjectPairs;
+    }
+
+    public void setPredicateObjectPairs(List<Map.Entry<Var, Var>> 
predicateObjectPairs) {
+        this.predicateObjectPairs = predicateObjectPairs;
+    }
+
+    public String getEndTimeRange() {
+        return endTimeRange;
+    }
+
+    public void setEndTimeRange(String endTimeRange) {
+        this.endTimeRange = endTimeRange;
+    }
+
+    public String getStartTimeRange() {
+        return startTimeRange;
+    }
+
+    public void setStartTimeRange(String startTimeRange) {
+        this.startTimeRange = startTimeRange;
+    }
+
+    public String getTimePredicate() {
+        return timePredicate;
+    }
+
+    public void setTimePredicate(String timePredicate) {
+        this.timePredicate = timePredicate;
+    }
+
+    public PartitionConstants.TimeType getTimeType() {
+        return timeType;
+    }
+
+    public void setTimeType(PartitionConstants.TimeType timeType) {
+        this.timeType = timeType;
+    }
+
+    public String getShardStartTimeRange() {
+        return shardStartTimeRange;
+    }
+
+    public void setShardStartTimeRange(String shardStartTimeRange) {
+        this.shardStartTimeRange = shardStartTimeRange;
+    }
+
+    public String getShardEndTimeRange() {
+        return shardEndTimeRange;
+    }
+
+    public void setShardEndTimeRange(String shardEndTimeRange) {
+        this.shardEndTimeRange = shardEndTimeRange;
+    }
+
+    public ShardSubjectLookup clone() {
+        return (ShardSubjectLookup) super.clone();
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        return other instanceof ShardSubjectLookup && super.equals(other);
+    }
+
+    @Override
+    public int hashCode() {
+        return super.hashCode() ^ "ShardSubjectLookup".hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "ShardSubjectLookup{" +
+                "subject=" + subject +
+                ", predicateObjectPairs=" + predicateObjectPairs +
+                ", timePredicate='" + timePredicate + '\'' +
+                ", startTimeRange='" + startTimeRange + '\'' +
+                ", endTimeRange='" + endTimeRange + '\'' +
+                ", timeType=" + timeType +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/shard/DateHashModShardValueGenerator.java
----------------------------------------------------------------------
diff --git 
a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/shard/DateHashModShardValueGenerator.java
 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/shard/DateHashModShardValueGenerator.java
new file mode 100644
index 0000000..304fadf
--- /dev/null
+++ 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/shard/DateHashModShardValueGenerator.java
@@ -0,0 +1,52 @@
+package mvm.mmrts.rdf.partition.shard;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ * Class DateHashModShardValueGenerator
+ * Date: Jul 6, 2011
+ * Time: 6:29:50 PM
+ */
+public class DateHashModShardValueGenerator implements ShardValueGenerator {
+
+    protected int baseMod = 50;
+
+    protected SimpleDateFormat format = new SimpleDateFormat("yyyyMMdd");
+    private static final String DATE_SHARD_DELIM = "_";
+
+    public DateHashModShardValueGenerator() {
+    }
+
+    public DateHashModShardValueGenerator(SimpleDateFormat format, int 
baseMod) {
+        this.baseMod = baseMod;
+        this.format = format;
+    }
+
+    @Override
+    public String generateShardValue(Object obj) {
+        return this.generateShardValue(System.currentTimeMillis(), obj);
+    }
+
+    public String generateShardValue(Long date, Object obj) {
+        if (obj == null)
+            return format.format(new Date(date));
+        return format.format(new Date(date)) + DATE_SHARD_DELIM + 
(Math.abs(obj.hashCode() % baseMod));
+    }
+
+    public int getBaseMod() {
+        return baseMod;
+    }
+
+    public void setBaseMod(int baseMod) {
+        this.baseMod = baseMod;
+    }
+
+    public SimpleDateFormat getFormat() {
+        return format;
+    }
+
+    public void setFormat(SimpleDateFormat format) {
+        this.format = format;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/shard/ShardValueGenerator.java
----------------------------------------------------------------------
diff --git 
a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/shard/ShardValueGenerator.java
 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/shard/ShardValueGenerator.java
new file mode 100644
index 0000000..2b1c296
--- /dev/null
+++ 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/shard/ShardValueGenerator.java
@@ -0,0 +1,12 @@
+package mvm.mmrts.rdf.partition.shard;
+
+/**
+ * Interface ShardValueGenerator
+ * Date: Jul 6, 2011
+ * Time: 6:29:08 PM
+ */
+public interface ShardValueGenerator {
+
+    public String generateShardValue(Object obj);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/ContextsStatementImpl.java
----------------------------------------------------------------------
diff --git 
a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/ContextsStatementImpl.java
 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/ContextsStatementImpl.java
new file mode 100644
index 0000000..966f546
--- /dev/null
+++ 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/ContextsStatementImpl.java
@@ -0,0 +1,30 @@
+package mvm.mmrts.rdf.partition.utils;
+
+import org.openrdf.model.Resource;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.impl.StatementImpl;
+
+/**
+ * Class ContextsStatementImpl
+ * Date: Aug 5, 2011
+ * Time: 7:48:56 AM
+ */
+public class ContextsStatementImpl extends StatementImpl {
+    private Resource[] contexts;
+
+    public ContextsStatementImpl(Resource subject, URI predicate, Value 
object, Resource... contexts) {
+        super(subject, predicate, object);
+        this.contexts = contexts;
+    }
+
+    public Resource[] getContexts() {
+        return contexts;
+    }
+
+    @Override
+    public Resource getContext() {
+        //return first context in array
+        return (contexts != null && contexts.length > 0) ? contexts[0] : null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/CountPredObjPairs.java
----------------------------------------------------------------------
diff --git 
a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/CountPredObjPairs.java
 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/CountPredObjPairs.java
new file mode 100644
index 0000000..2b83c6b
--- /dev/null
+++ 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/CountPredObjPairs.java
@@ -0,0 +1,39 @@
+package mvm.mmrts.rdf.partition.utils;
+
+import mvm.mmrts.rdf.partition.query.operators.ShardSubjectLookup;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.Var;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Class CountPredObjPairs
+ * Date: Apr 12, 2011
+ * Time: 1:31:05 PM
+ */
+public class CountPredObjPairs {
+
+    public CountPredObjPairs() {
+    }
+
+    public double getCount(TupleExpr expr) {
+        int count = 100;
+        if (expr instanceof ShardSubjectLookup) {
+            ShardSubjectLookup lookup = (ShardSubjectLookup) expr;
+            List<Map.Entry<Var, Var>> entries = 
lookup.getPredicateObjectPairs();
+            count -= (lookup.getSubject().hasValue()) ? 1 : 0;
+            count -= (lookup.getTimePredicate() != null) ? 1 : 0;
+            for (Map.Entry<Var, Var> entry : entries) {
+                count -= (entry.getValue().hasValue() && 
entry.getKey().hasValue()) ? 1 : 0;
+            }
+        } else if (expr instanceof StatementPattern) {
+            StatementPattern sp = (StatementPattern) expr;
+            count -= (sp.getSubjectVar().hasValue()) ? 1 : 0;
+            count -= (sp.getPredicateVar().hasValue() && 
sp.getObjectVar().hasValue()) ? 1 : 0;
+        }
+        return count;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/PartitionUtils.java
----------------------------------------------------------------------
diff --git 
a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/PartitionUtils.java
 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/PartitionUtils.java
new file mode 100644
index 0000000..3e3b024
--- /dev/null
+++ 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/PartitionUtils.java
@@ -0,0 +1,9 @@
+package mvm.mmrts.rdf.partition.utils;
+
+/**
+ * Class PartitionUtils
+ * Date: Jul 6, 2011
+ * Time: 11:49:11 AM
+ */
+public class PartitionUtils {
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/RdfIO.java
----------------------------------------------------------------------
diff --git 
a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/RdfIO.java
 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/RdfIO.java
new file mode 100644
index 0000000..b7d6ec8
--- /dev/null
+++ 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/RdfIO.java
@@ -0,0 +1,166 @@
+package mvm.mmrts.rdf.partition.utils;
+
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import mvm.mmrts.rdf.partition.InvalidValueTypeMarkerRuntimeException;
+import org.openrdf.model.*;
+import org.openrdf.model.impl.StatementImpl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static mvm.mmrts.rdf.partition.PartitionConstants.*;
+
+/**
+ * Class RdfIO
+ * Date: Jul 6, 2011
+ * Time: 12:13:18 PM
+ */
+public class RdfIO {
+
+    public static byte[] writeStatement(Statement statement, boolean document) 
throws IOException {
+        if (statement == null)
+            return new byte[]{};
+        ByteArrayDataOutput dataOut = ByteStreams.newDataOutput();
+
+        if (document) {
+            writeValue(dataOut, statement.getSubject());
+            dataOut.writeByte(FAMILY_DELIM);
+            writeValue(dataOut, statement.getPredicate());
+            dataOut.writeByte(FAMILY_DELIM);
+            writeValue(dataOut, statement.getObject());
+//            dataOut.writeByte(FAMILY_DELIM);
+        } else {
+            //index
+            writeValue(dataOut, statement.getPredicate());
+            dataOut.writeByte(INDEX_DELIM);
+            writeValue(dataOut, statement.getObject());
+            dataOut.writeByte(FAMILY_DELIM);
+            writeValue(dataOut, statement.getSubject());
+//            dataOut.writeByte(FAMILY_DELIM);
+        }
+
+        return dataOut.toByteArray();
+    }
+
+    public static byte[] writeValue(Value value) throws IOException {
+        ByteArrayDataOutput output = ByteStreams.newDataOutput();
+        writeValue(output, value);
+        return output.toByteArray();
+    }
+
+    public static void writeValue(ByteArrayDataOutput dataOut, Value value) 
throws IOException {
+        if (value == null || dataOut == null)
+            throw new IllegalArgumentException("Arguments cannot be null");
+        if (value instanceof URI) {
+            dataOut.writeByte(URI_MARKER);
+            dataOut.write(value.toString().getBytes());
+        } else if (value instanceof BNode) {
+            dataOut.writeByte(BNODE_MARKER);
+            dataOut.write(((BNode) value).getID().getBytes());
+        } else if (value instanceof Literal) {
+            Literal lit = (Literal) value;
+
+            String label = lit.getLabel();
+            String language = lit.getLanguage();
+            URI datatype = lit.getDatatype();
+
+            if (datatype != null) {
+                dataOut.writeByte(DATATYPE_LITERAL_MARKER);
+                dataOut.write(label.getBytes());
+                dataOut.writeByte(DATATYPE_LITERAL_MARKER);
+                writeValue(dataOut, datatype);
+            } else if (language != null) {
+                dataOut.writeByte(LANG_LITERAL_MARKER);
+                dataOut.write(label.getBytes());
+                dataOut.writeByte(LANG_LITERAL_MARKER);
+                dataOut.write(language.getBytes());
+            } else {
+                dataOut.writeByte(PLAIN_LITERAL_MARKER);
+                dataOut.write(label.getBytes());
+            }
+        } else {
+            throw new IllegalArgumentException("unexpected value type: "
+                    + value.getClass());
+        }
+    }
+
+    public static Statement readStatement(ByteArrayDataInput dataIn, 
ValueFactory vf)
+            throws IOException {
+
+        return readStatement(dataIn, vf, true);
+    }
+
+    //TODO: This could be faster somehow, more efficient
+
+    private static byte[] readFully(ByteArrayDataInput dataIn, byte delim) {
+        ByteArrayDataOutput output = ByteStreams.newDataOutput();
+        try {
+            byte curr;
+            while ((curr = dataIn.readByte()) != delim) {
+                output.writeByte(curr);
+            }
+        } catch (IllegalStateException e) {
+            //end of array
+        }
+        return output.toByteArray();
+    }
+
+    public static Statement readStatement(ByteArrayDataInput dataIn, 
ValueFactory vf, boolean doc)
+            throws IOException {
+
+        //doc order: subject/0predicate/0object
+        //index order: predicate/1object/0subject
+        byte delim = (doc) ? FAMILY_DELIM : INDEX_DELIM;
+        List<Value> values = new ArrayList<Value>();
+        while (values.size() < 3) {
+            Value addThis = readValue(dataIn, vf, delim);
+            values.add(addThis);
+            delim = FAMILY_DELIM;
+        }
+
+        if (doc)
+            return new StatementImpl((Resource) values.get(0), (URI) 
values.get(1), values.get(2));
+        else
+            return new StatementImpl((Resource) values.get(2), (URI) 
values.get(0), values.get(1));
+    }
+
+    public static Value readValue(ByteArrayDataInput dataIn, ValueFactory vf, 
byte delim) throws IOException {
+        int valueTypeMarker;
+        try {
+            valueTypeMarker = dataIn.readByte();
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+        Value addThis = null;
+        if (valueTypeMarker == URI_MARKER) {
+            byte[] bytes = readFully(dataIn, delim);
+            addThis = vf.createURI(new String(bytes));
+        } else if (valueTypeMarker == BNODE_MARKER) {
+            byte[] bytes = readFully(dataIn, delim);
+            addThis = vf.createBNode(new String(bytes));
+        } else if (valueTypeMarker == PLAIN_LITERAL_MARKER) {
+            byte[] bytes = readFully(dataIn, delim);
+            addThis = vf.createLiteral(new String(bytes));
+        } else if (valueTypeMarker == LANG_LITERAL_MARKER) {
+            byte[] bytes = readFully(dataIn, (byte) LANG_LITERAL_MARKER);
+            String label = new String(bytes);
+            bytes = readFully(dataIn, delim);
+            addThis = vf.createLiteral(label, new String(bytes));
+        } else if (valueTypeMarker == DATATYPE_LITERAL_MARKER) {
+            byte[] bytes = readFully(dataIn, (byte) DATATYPE_LITERAL_MARKER);
+            String label_s = new String(bytes);
+            if (URI_MARKER != dataIn.readByte()) {
+                throw new IllegalArgumentException("Expected a URI datatype 
here");
+            }
+            bytes = readFully(dataIn, delim);
+            addThis = vf.createLiteral(label_s, vf.createURI(new 
String(bytes)));
+        } else {
+            throw new InvalidValueTypeMarkerRuntimeException(valueTypeMarker, 
"Invalid value type marker: "
+                    + valueTypeMarker);
+        }
+        return addThis;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/resources/partitionTableLoad.cbexec
----------------------------------------------------------------------
diff --git 
a/partition/partition.rdf/src/main/resources/partitionTableLoad.cbexec 
b/partition/partition.rdf/src/main/resources/partitionTableLoad.cbexec
new file mode 100644
index 0000000..74ddbe2
--- /dev/null
+++ b/partition/partition.rdf/src/main/resources/partitionTableLoad.cbexec
@@ -0,0 +1,4 @@
+createtable rdfShardIndex
+createtable rdfPartition
+config -t rdfPartition -s table.split.threshold=3G
+config -t rdfPartition -s table.compaction.major.ratio=1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/test/java/mvm/mmrts/rdf/partition/LoadPartitionData.java
----------------------------------------------------------------------
diff --git 
a/partition/partition.rdf/src/test/java/mvm/mmrts/rdf/partition/LoadPartitionData.java
 
b/partition/partition.rdf/src/test/java/mvm/mmrts/rdf/partition/LoadPartitionData.java
new file mode 100644
index 0000000..d5c082a
--- /dev/null
+++ 
b/partition/partition.rdf/src/test/java/mvm/mmrts/rdf/partition/LoadPartitionData.java
@@ -0,0 +1,79 @@
+package mvm.mmrts.rdf.partition;
+
+import cloudbase.core.client.ZooKeeperInstance;
+import cloudbase.core.security.ColumnVisibility;
+import mvm.mmrts.rdf.partition.converter.ContextColVisConverter;
+import org.openrdf.model.Resource;
+import org.openrdf.model.URI;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.StatementImpl;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.repository.Repository;
+import org.openrdf.repository.RepositoryConnection;
+import org.openrdf.repository.sail.SailRepository;
+
+import javax.xml.datatype.DatatypeFactory;
+
+public class LoadPartitionData {
+
+    public static final String NAMESPACE = 
"http://here/2010/tracked-data-provenance/ns#";//44 len
+    public static final String RDF_NS = 
"http://www.w3.org/1999/02/22-rdf-syntax-ns#";;
+
+    static ValueFactory vf = ValueFactoryImpl.getInstance();
+
+    /**
+     * @param args
+     */
+    public static void main(String[] args) {
+        try {
+
+            final PartitionSail store = new PartitionSail(new 
ZooKeeperInstance("stratus", "stratus13:2181").getConnector("root", 
"password".getBytes()), "rdfPartition");
+            store.setContextColVisConverter(new ContextColVisConverter() {
+
+                @Override
+                public ColumnVisibility convertContexts(Resource... contexts) {
+                    if (contexts != null) {
+                        StringBuffer sb = new StringBuffer();
+                        for (int i = 0; i < contexts.length; i++) {
+                            Resource context = contexts[i];
+                            if (context instanceof URI) {
+                                URI uri = (URI) context;
+                                sb.append(uri.getLocalName());
+                                if (i != (contexts.length - 1)) {
+                                    sb.append("|");
+                                }
+                            }
+                        }
+                        return new ColumnVisibility(sb.toString());
+                    }
+                    return null;
+                }
+            });
+            Repository myRepository = new SailRepository(store);
+            myRepository.initialize();
+
+            RepositoryConnection conn = myRepository.getConnection();
+
+            URI A = vf.createURI("urn:colvis#A");
+            URI B = vf.createURI("urn:colvis#B");
+            URI C = vf.createURI("urn:colvis#C");
+
+            String uuid = "uuidAuth1";
+            conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), 
vf.createURI(RDF_NS, "type"), vf.createURI(NAMESPACE, "Created")), A, B, C);
+            conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), 
vf.createURI(NAMESPACE, "createdItem"), vf.createURI(NAMESPACE, 
"objectUuid1")), A, B);
+            conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), 
vf.createURI(NAMESPACE, "performedBy"), vf.createURI("urn:system:A")), A, B);
+            conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), 
vf.createURI(NAMESPACE, "stringLit"), vf.createLiteral("stringLit")), A);
+            conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), 
vf.createURI(NAMESPACE, "performedAt"), 
vf.createLiteral(DatatypeFactory.newInstance().newXMLGregorianCalendar(2011, 7, 
12, 6, 0, 0, 0, 0))), B, C);
+            conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), 
vf.createURI(NAMESPACE, "reportedAt"), 
vf.createLiteral(DatatypeFactory.newInstance().newXMLGregorianCalendar(2011, 7, 
12, 6, 1, 0, 0, 0))), C);
+            conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), 
vf.createURI(NAMESPACE, "booleanLit"), vf.createLiteral(true)));
+
+            conn.commit();
+            conn.close();
+
+            myRepository.shutDown();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/test/java/mvm/mmrts/rdf/partition/LoadPartitionData2.java
----------------------------------------------------------------------
diff --git 
a/partition/partition.rdf/src/test/java/mvm/mmrts/rdf/partition/LoadPartitionData2.java
 
b/partition/partition.rdf/src/test/java/mvm/mmrts/rdf/partition/LoadPartitionData2.java
new file mode 100644
index 0000000..29682ad
--- /dev/null
+++ 
b/partition/partition.rdf/src/test/java/mvm/mmrts/rdf/partition/LoadPartitionData2.java
@@ -0,0 +1,69 @@
+package mvm.mmrts.rdf.partition;
+
+import cloudbase.core.client.ZooKeeperInstance;
+import cloudbase.core.security.ColumnVisibility;
+import mvm.mmrts.rdf.partition.converter.ContextColVisConverter;
+import mvm.mmrts.rdf.partition.shard.DateHashModShardValueGenerator;
+import org.openrdf.model.Resource;
+import org.openrdf.model.URI;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.StatementImpl;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.repository.Repository;
+import org.openrdf.repository.RepositoryConnection;
+import org.openrdf.repository.sail.SailRepository;
+
+public class LoadPartitionData2 {
+
+    public static final String NAMESPACE = 
"http://here/2010/tracked-data-provenance/ns#";//44 len
+    public static final String RDF_NS = 
"http://www.w3.org/1999/02/22-rdf-syntax-ns#";;
+
+    static ValueFactory vf = ValueFactoryImpl.getInstance();
+
+    /**
+     * @param args
+     */
+    public static void main(String[] args) {
+        try {
+
+            DateHashModShardValueGenerator gen = new 
DateHashModShardValueGenerator();
+            gen.setBaseMod(10);
+            final PartitionSail store = new PartitionSail(new 
ZooKeeperInstance("stratus", "stratus13:2181").getConnector("root", 
"password".getBytes()), "rdfPartition", gen);
+            store.setContextColVisConverter(new ContextColVisConverter() {
+
+                @Override
+                public ColumnVisibility convertContexts(Resource... contexts) {
+                    if (contexts != null) {
+                        StringBuffer sb = new StringBuffer();
+                        for (int i = 0; i < contexts.length; i++) {
+                            Resource context = contexts[i];
+                            if (context instanceof URI) {
+                                URI uri = (URI) context;
+                                sb.append(uri.getLocalName());
+                                if (i != (contexts.length - 1)) {
+                                    sb.append("|");
+                                }
+                            }
+                        }
+                        return new ColumnVisibility(sb.toString());
+                    }
+                    return null;
+                }
+            });
+            Repository myRepository = new SailRepository(store);
+            myRepository.initialize();
+
+            RepositoryConnection conn = myRepository.getConnection();
+
+            conn.add(new 
StatementImpl(vf.createURI("http://www.Department0.University0.edu/GraduateStudent44";),
 vf.createURI("urn:lubm:test#specific"), vf.createURI("urn:lubm:test#value")));
+
+            conn.commit();
+            conn.close();
+
+            myRepository.shutDown();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/test/java/mvm/mmrts/rdf/partition/LoadSampleData.java
----------------------------------------------------------------------
diff --git 
a/partition/partition.rdf/src/test/java/mvm/mmrts/rdf/partition/LoadSampleData.java
 
b/partition/partition.rdf/src/test/java/mvm/mmrts/rdf/partition/LoadSampleData.java
new file mode 100644
index 0000000..4e86491
--- /dev/null
+++ 
b/partition/partition.rdf/src/test/java/mvm/mmrts/rdf/partition/LoadSampleData.java
@@ -0,0 +1,64 @@
+package mvm.mmrts.rdf.partition;
+
+import cloudbase.core.client.ZooKeeperInstance;
+import cloudbase.core.security.ColumnVisibility;
+import mvm.mmrts.rdf.partition.converter.ContextColVisConverter;
+import org.openrdf.model.Resource;
+import org.openrdf.model.URI;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.StatementImpl;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.repository.Repository;
+import org.openrdf.repository.RepositoryConnection;
+import org.openrdf.repository.sail.SailRepository;
+
+import javax.xml.datatype.DatatypeFactory;
+
+public class LoadSampleData {
+
+    public static final String NAMESPACE = 
"http://here/2010/tracked-data-provenance/ns#";//44 len
+    public static final String RDF_NS = 
"http://www.w3.org/1999/02/22-rdf-syntax-ns#";;
+
+    static ValueFactory vf = ValueFactoryImpl.getInstance();
+
+    /**
+     * @param args
+     */
+    public static void main(String[] args) {
+        try {
+
+            final PartitionSail store = new PartitionSail(new 
ZooKeeperInstance("stratus", "stratus13:2181").getConnector("root", 
"password".getBytes()), "partTest", "shardIndexTest");
+            
+            Repository myRepository = new SailRepository(store);
+            myRepository.initialize();
+
+            RepositoryConnection conn = myRepository.getConnection();
+
+            String uuid = "uuidAuth1";
+            conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), 
vf.createURI(RDF_NS, "type"), vf.createURI(NAMESPACE, "Created")));
+            conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), 
vf.createURI(NAMESPACE, "createdItem"), vf.createURI(NAMESPACE, 
"objectUuid1")));
+            conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), 
vf.createURI(NAMESPACE, "performedBy"), vf.createURI("urn:system:A")));
+            conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), 
vf.createURI(NAMESPACE, "stringLit"), vf.createLiteral("stringLit")));
+            conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), 
vf.createURI(NAMESPACE, "performedAt"), 
vf.createLiteral(DatatypeFactory.newInstance().newXMLGregorianCalendar(2011, 7, 
12, 6, 0, 0, 0, 0))));
+            conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), 
vf.createURI(NAMESPACE, "reportedAt"), 
vf.createLiteral(DatatypeFactory.newInstance().newXMLGregorianCalendar(2011, 7, 
12, 6, 1, 0, 0, 0))));
+            conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), 
vf.createURI(NAMESPACE, "booleanLit"), vf.createLiteral(true)));
+
+            uuid = "uuidAuth4";
+            conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), 
vf.createURI(RDF_NS, "type"), vf.createURI(NAMESPACE, "Created")));
+            conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), 
vf.createURI(NAMESPACE, "createdItem"), vf.createURI(NAMESPACE, 
"objectUuid1")));
+            conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), 
vf.createURI(NAMESPACE, "performedBy"), vf.createURI("urn:system:A")));
+            conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), 
vf.createURI(NAMESPACE, "stringLit"), vf.createLiteral("stringLit")));
+            conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), 
vf.createURI(NAMESPACE, "performedAt"), 
vf.createLiteral(DatatypeFactory.newInstance().newXMLGregorianCalendar(2011, 7, 
12, 6, 0, 0, 0, 0))));
+            conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), 
vf.createURI(NAMESPACE, "reportedAt"), 
vf.createLiteral(DatatypeFactory.newInstance().newXMLGregorianCalendar(2011, 7, 
12, 6, 1, 0, 0, 0))));
+            conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), 
vf.createURI(NAMESPACE, "booleanLit"), vf.createLiteral(true)));
+
+            conn.commit();
+            conn.close();
+
+            myRepository.shutDown();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+}

Reply via email to