http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformer.java
----------------------------------------------------------------------
diff --git 
a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformer.java
 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformer.java
new file mode 100644
index 0000000..38c9ea5
--- /dev/null
+++ 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformer.java
@@ -0,0 +1,331 @@
+package mvm.mmrts.rdf.partition.mr.transform;
+
+import cloudbase.core.CBConstants;
+import cloudbase.core.client.TableNotFoundException;
+import cloudbase.core.data.Key;
+import cloudbase.core.data.Range;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import mvm.rya.cloudbase.utils.input.CloudbaseBatchScannerInputFormat;
+import mvm.mmrts.rdf.partition.mr.iterators.SortedEncodedRangeIterator;
+import mvm.mmrts.rdf.partition.query.operators.ShardSubjectLookup;
+import mvm.mmrts.rdf.partition.shard.DateHashModShardValueGenerator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.Var;
+import ss.cloudbase.core.iterators.GMDenIntersectingIterator;
+import ss.cloudbase.core.iterators.SortedRangeIterator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static mvm.mmrts.rdf.partition.PartitionConstants.*;
+import static mvm.mmrts.rdf.partition.utils.RdfIO.writeValue;
+
+import static 
mvm.mmrts.rdf.partition.mr.transform.SparqlCloudbaseIFTransformerConstants.*;
+
+/**
+ * Class SparqlCloudbaseIFTransformer
+ * Date: Sep 1, 2011
+ * Time: 11:28:48 AM
+ */
+public class SparqlCloudbaseIFTransformer {
+
+    protected Job job;
+
+    protected String userName;
+    protected String pwd;
+    protected String instance;
+    protected String zk;
+
+    protected ShardSubjectLookup lookup;
+//    protected Configuration configuration;
+    protected String table;
+
+    protected DateHashModShardValueGenerator generator;
+
+    public SparqlCloudbaseIFTransformer(ShardSubjectLookup lookup, 
Configuration configuration, Job job, String table,
+                                        String userName, String pwd, String 
instance, String zk) throws QueryEvaluationException {
+        this(lookup, configuration, job, table, userName, pwd, instance, zk, 
new DateHashModShardValueGenerator());
+    }
+
+    public SparqlCloudbaseIFTransformer(ShardSubjectLookup lookup, 
Configuration configuration, Job job, String table,
+                                        String userName, String pwd, String 
instance, String zk, DateHashModShardValueGenerator generator) throws 
QueryEvaluationException {
+        this.lookup = lookup;
+//        this.configuration = configuration;
+        this.table = table;
+        this.job = job;
+        this.userName = userName;
+        this.pwd = pwd;
+        this.instance = instance;
+        this.zk = zk;
+        this.generator = generator;
+
+        this.initialize();
+    }
+
+
+    public void initialize() throws QueryEvaluationException {
+        try {
+            /**
+             * Here we will set up the BatchScanner based on the lookup
+             */
+            Var subject = lookup.getSubject();
+            List<Map.Entry<Var, Var>> where = retrieveWhereClause();
+            List<Map.Entry<Var, Var>> select = retrieveSelectClause();
+
+            //global start-end time
+            long start = job.getConfiguration().getLong(START_BINDING, 0);
+            long end = job.getConfiguration().getLong(END_BINDING, 
System.currentTimeMillis());
+
+            int whereSize = where.size() + ((!isTimeRange(lookup, 
job.getConfiguration())) ? 0 : 1);
+
+            if (subject.hasValue()
+                    && where.size() == 0  /* Not using whereSize, because we 
can set up the TimeRange in the scanner */
+                    && select.size() == 0) {
+                /**
+                 * Case 1: Subject is set, but predicate, object are not.
+                 * Return all for the subject
+                 */
+//                this.scanner = scannerForSubject((URI) subject.getValue());
+//                if (this.scanner == null) {
+//                    this.iter = new EmptyIteration();
+//                    return;
+//                }
+//                Map.Entry<Var, Var> predObj = 
lookup.getPredicateObjectPairs().get(0);
+//                this.iter = new SelectAllIterator(this.bindings, 
this.scanner.iterator(), predObj.getKey(), predObj.getValue());
+                throw new UnsupportedOperationException("Query Case not 
supported");
+            } else if (subject.hasValue()
+                    && where.size() == 0 /* Not using whereSize, because we 
can set up the TimeRange in the scanner */) {
+                /**
+                 * Case 2: Subject is set, and a few predicates are set, but 
no objects
+                 * Return all, and filter which predicates you are interested 
in
+                 */
+//                this.scanner = scannerForSubject((URI) subject.getValue());
+//                if (this.scanner == null) {
+//                    this.iter = new EmptyIteration();
+//                    return;
+//                }
+//                this.iter = new FilterIterator(this.bindings, 
this.scanner.iterator(), subject, select);
+                throw new UnsupportedOperationException("Query Case not 
supported");
+            } else if (subject.hasValue()
+                    && where.size() >= 1 /* Not using whereSize, because we 
can set up the TimeRange in the scanner */) {
+                /**
+                 * Case 2a: Subject is set, and a few predicates are set, and 
one object
+                 * TODO: For now we will ignore the predicate-object filter 
because we do not know how to query for this
+                 */
+//                this.scanner = scannerForSubject((URI) subject.getValue());
+//                if (this.scanner == null) {
+//                    this.iter = new EmptyIteration();
+//                    return;
+//                }
+//                this.iter = new FilterIterator(this.bindings, 
this.scanner.iterator(), subject, select);
+                throw new UnsupportedOperationException("Query Case not 
supported");
+            } else if (!subject.hasValue() && whereSize > 1) {
+                /**
+                 * Case 3: Subject is not set, more than one where clause
+                 */
+                scannerForPredicateObject(lookup, start, end, where);
+                setSelectFilter(subject, select);
+            } else if (!subject.hasValue() && whereSize == 1) {
+                /**
+                 * Case 4: No subject, only one where clause
+                 */
+                Map.Entry<Var, Var> predObj = null;
+                if (where.size() == 1) {
+                    predObj = where.get(0);
+                }
+                scannerForPredicateObject(lookup, start, end, predObj);
+                setSelectFilter(subject, select);
+            } else if (!subject.hasValue() && whereSize == 0 && select.size() 
> 1) {
+                /**
+                 * Case 5: No subject, no where (just 1 select)
+                 */
+//                this.scanner = scannerForPredicates(start, end, select);
+//                if (this.scanner == null) {
+//                    this.iter = new EmptyIteration();
+//                    return;
+//                }
+//                this.iter = new FilterIterator(this.bindings, 
this.scanner.iterator(), subject, select);
+                throw new UnsupportedOperationException("Query Case not 
supported");
+            } else if (!subject.hasValue() && whereSize == 0 && select.size() 
== 1) {
+                /**
+                 * Case 5: No subject, no where (just 1 select)
+                 */
+//                cloudbase.core.client.Scanner sc = 
scannerForPredicate(start, end, (URI) select.get(0).getKey().getValue());
+//                if (sc == null) {
+//                    this.iter = new EmptyIteration();
+//                    return;
+//                }
+//                this.iter = new FilterIterator(this.bindings, sc.iterator(), 
subject, select);
+                throw new UnsupportedOperationException("Query Case not 
supported");
+            } else {
+                throw new QueryEvaluationException("Case not supported as of 
yet");
+            }
+
+        } catch (Exception e) {
+            throw new QueryEvaluationException(e);
+        }
+    }
+
+    protected void setSelectFilter(Var subj, List<Map.Entry<Var, Var>> select) 
{
+        List<String> selectStrs = new ArrayList<String>();
+        for (Map.Entry<Var, Var> entry : select) {
+            Var key = entry.getKey();
+            Var obj = entry.getValue();
+            if (key.hasValue()) {
+                String pred_s = key.getValue().stringValue();
+                selectStrs.add(pred_s);
+                job.getConfiguration().set(pred_s, obj.getName());
+            }
+        }
+        job.getConfiguration().setStrings(SELECT_FILTER, 
selectStrs.toArray(new String[selectStrs.size()]));
+        job.getConfiguration().set(SUBJECT_NAME, subj.getName());
+    }
+
+    protected List<Map.Entry<Var, Var>> retrieveWhereClause() {
+        List<Map.Entry<Var, Var>> where = new ArrayList<Map.Entry<Var, Var>>();
+        for (Map.Entry<Var, Var> entry : lookup.getPredicateObjectPairs()) {
+            Var pred = entry.getKey();
+            Var object = entry.getValue();
+            if (pred.hasValue() && object.hasValue()) {
+                where.add(entry); //TODO: maybe we should clone this?
+            }
+        }
+        return where;
+    }
+
+    protected List<Map.Entry<Var, Var>> retrieveSelectClause() {
+        List<Map.Entry<Var, Var>> select = new ArrayList<Map.Entry<Var, 
Var>>();
+        for (Map.Entry<Var, Var> entry : lookup.getPredicateObjectPairs()) {
+            Var pred = entry.getKey();
+            Var object = entry.getValue();
+            if (pred.hasValue() && !object.hasValue()) {
+                select.add(entry); //TODO: maybe we should clone this?
+            }
+        }
+        return select;
+    }
+
+    protected void scannerForPredicateObject(ShardSubjectLookup lookup, Long 
start, Long end, List<Map.Entry<Var, Var>> predObjs) throws IOException, 
TableNotFoundException {
+        start = validateFillStartTime(start, lookup);
+        end = validateFillEndTime(end, lookup);
+
+        int extra = 0;
+
+        if (isTimeRange(lookup, job.getConfiguration())) {
+            extra += 1;
+        }
+
+        Text[] queries = new Text[predObjs.size() + extra];
+        for (int i = 0; i < predObjs.size(); i++) {
+            Map.Entry<Var, Var> predObj = predObjs.get(i);
+            ByteArrayDataOutput output = ByteStreams.newDataOutput();
+            writeValue(output, predObj.getKey().getValue());
+            output.write(INDEX_DELIM);
+            writeValue(output, predObj.getValue().getValue());
+            queries[i] = new Text(output.toByteArray());
+        }
+
+        if (isTimeRange(lookup, job.getConfiguration())) {
+            queries[queries.length - 1] = new Text(
+                    GMDenIntersectingIterator.getRangeTerm(INDEX.toString(),
+                            getStartTimeRange(lookup, job.getConfiguration())
+                            , true,
+                            getEndTimeRange(lookup, job.getConfiguration()),
+                            true
+                    )
+            );
+        }
+
+        createBatchScannerInputFormat();
+        CloudbaseBatchScannerInputFormat.setIterator(job, 20, 
GMDenIntersectingIterator.class.getName(), "ii");
+        CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ii", 
GMDenIntersectingIterator.docFamilyOptionName, DOC.toString());
+        CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ii", 
GMDenIntersectingIterator.indexFamilyOptionName, INDEX.toString());
+        CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ii", 
GMDenIntersectingIterator.columnFamiliesOptionName, 
GMDenIntersectingIterator.encodeColumns(queries));
+        CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ii", 
GMDenIntersectingIterator.OPTION_MULTI_DOC, "" + true);
+
+        Range range = new Range(
+                new Key(new Text(generator.generateShardValue(start, null) + 
"\0")),
+                new Key(new Text(generator.generateShardValue(end, null) + 
"\uFFFD"))
+        );
+        CloudbaseBatchScannerInputFormat.setRanges(job, Collections.singleton(
+                range
+        ));
+    }
+
+    protected void scannerForPredicateObject(ShardSubjectLookup lookup, Long 
start, Long end, Map.Entry<Var, Var> predObj) throws IOException, 
TableNotFoundException {
+        start = validateFillStartTime(start, lookup);
+        end = validateFillEndTime(end, lookup);
+
+        /**
+         * Need to use GMDen because SortedRange can't serialize non xml 
characters in range
+         * @see https://issues.apache.org/jira/browse/MAPREDUCE-109
+         */
+        createBatchScannerInputFormat();
+        CloudbaseBatchScannerInputFormat.setIterator(job, 20, 
SortedEncodedRangeIterator.class.getName(), "ri");
+        CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ri", 
SortedRangeIterator.OPTION_DOC_COLF, DOC.toString());
+        CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ri", 
SortedRangeIterator.OPTION_COLF, INDEX.toString());
+        CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ri", 
SortedRangeIterator.OPTION_START_INCLUSIVE, "" + true);
+        CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ri", 
SortedRangeIterator.OPTION_END_INCLUSIVE, "" + true);
+        CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ri", 
SortedRangeIterator.OPTION_MULTI_DOC, "" + true);
+
+        String lower, upper = null;
+        if (isTimeRange(lookup, job.getConfiguration())) {
+            lower = getStartTimeRange(lookup, job.getConfiguration());
+            upper = getEndTimeRange(lookup, job.getConfiguration());
+        } else {
+
+            ByteArrayDataOutput output = ByteStreams.newDataOutput();
+            writeValue(output, predObj.getKey().getValue());
+            output.write(INDEX_DELIM);
+            writeValue(output, predObj.getValue().getValue());
+
+            lower = new String(output.toByteArray());
+            upper = lower + "\01";
+        }
+        CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ri", 
SortedRangeIterator.OPTION_LOWER_BOUND, 
SortedEncodedRangeIterator.encode(lower));
+        CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ri", 
SortedRangeIterator.OPTION_UPPER_BOUND, 
SortedEncodedRangeIterator.encode(upper));
+
+        //TODO: Do we add a time predicate to this?
+//        bs.setScanIterators(19, FilteringIterator.class.getName(), 
"filteringIterator");
+//        bs.setScanIteratorOption("filteringIterator", "0", 
TimeRangeFilter.class.getName());
+//        bs.setScanIteratorOption("filteringIterator", "0." + 
TimeRangeFilter.TIME_RANGE_PROP, (end - start) + "");
+//        bs.setScanIteratorOption("filteringIterator", "0." + 
TimeRangeFilter.START_TIME_PROP, end + "");
+
+        Range range = new Range(
+                new Key(new Text(generator.generateShardValue(start, null) + 
"\0")),
+                new Key(new Text(generator.generateShardValue(end, null) + 
"\uFFFD"))
+        );
+        CloudbaseBatchScannerInputFormat.setRanges(job, Collections.singleton(
+                range
+        ));
+
+    }
+
+    protected void createBatchScannerInputFormat() {
+        job.setInputFormatClass(CloudbaseBatchScannerInputFormat.class);
+        CloudbaseBatchScannerInputFormat.setInputInfo(job, userName, 
pwd.getBytes(), table, CBConstants.NO_AUTHS); //may need to change these auths 
sometime soon
+        CloudbaseBatchScannerInputFormat.setZooKeeperInstance(job, instance, 
zk);
+        job.setMapperClass(KeyValueToMapWrMapper.class);
+        job.setCombinerClass(AggregateTriplesBySubjectCombiner.class);
+        job.setReducerClass(AggregateTriplesBySubjectReducer.class);
+
+        job.setMapOutputKeyClass(Text.class);
+        job.setMapOutputValueClass(MapWritable.class);
+        job.setOutputKeyClass(LongWritable.class);
+        job.setOutputValueClass(MapWritable.class);
+
+        job.getConfiguration().set("io.sort.mb", "256");
+        
job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", 
false);
+        
job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", 
false);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformerConstants.java
----------------------------------------------------------------------
diff --git 
a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformerConstants.java
 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformerConstants.java
new file mode 100644
index 0000000..84f83c0
--- /dev/null
+++ 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformerConstants.java
@@ -0,0 +1,12 @@
+package mvm.mmrts.rdf.partition.mr.transform;
+
+/**
+ * Class SparqlCloudbaseIFTransformerConstants
+ * Date: Sep 1, 2011
+ * Time: 5:01:10 PM
+ */
+public class SparqlCloudbaseIFTransformerConstants {
+    public static final String PREFIX = 
"mvm.mmrts.rdf.partition.mr.transform.";
+    public static final String SELECT_FILTER = PREFIX + "select";
+    public static final String SUBJECT_NAME = PREFIX + "subject";
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatToolTest.java
----------------------------------------------------------------------
diff --git 
a/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatToolTest.java
 
b/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatToolTest.java
new file mode 100644
index 0000000..effb9ff
--- /dev/null
+++ 
b/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatToolTest.java
@@ -0,0 +1,33 @@
+package mvm.mmrts.rdf.partition.mr.compat;
+
+import junit.framework.TestCase;
+
+/**
+ * Class ChangeShardDateFormatToolTest
+ * Date: Dec 9, 2011
+ * Time: 10:39:31 AM
+ */
+public class ChangeShardDateFormatToolTest extends TestCase {
+
+    public void testShardDelim() throws Exception {
+        String dateDelim = "-";
+        String shard = "2011-11-01";
+        int shardIndex = shard.lastIndexOf(dateDelim);
+        if (shardIndex == -1)
+            fail();
+        String date = shard.substring(0, shardIndex);
+        shard = shard.substring(shardIndex + 1, shard.length());
+        assertEquals("2011-11", date);
+        assertEquals("01", shard);
+
+        dateDelim = "_";
+        shard = "20111101_33";
+        shardIndex = shard.lastIndexOf(dateDelim);
+        if (shardIndex == -1)
+            fail();
+        date = shard.substring(0, shardIndex);
+        shard = shard.substring(shardIndex + 1, shard.length());
+        assertEquals("20111101", date);
+        assertEquals("33", shard);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToolTest.java
----------------------------------------------------------------------
diff --git 
a/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToolTest.java
 
b/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToolTest.java
new file mode 100644
index 0000000..c279348
--- /dev/null
+++ 
b/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToolTest.java
@@ -0,0 +1,80 @@
+package mvm.mmrts.rdf.partition.mr.fileinput;
+
+import cloudbase.core.client.Connector;
+import cloudbase.core.client.ZooKeeperInstance;
+import cloudbase.core.data.ColumnUpdate;
+import cloudbase.core.data.Mutation;
+import junit.framework.TestCase;
+import mvm.mmrts.rdf.partition.utils.RdfIO;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
+import org.apache.hadoop.mrunit.types.Pair;
+import org.apache.zookeeper.ZooKeeper;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.StatementImpl;
+import org.openrdf.model.impl.ValueFactoryImpl;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Class RdfFileInputToolTest
+ * Date: Aug 8, 2011
+ * Time: 3:22:25 PM
+ */
+public class RdfFileInputToolTest extends TestCase {
+
+    ValueFactory vf = ValueFactoryImpl.getInstance();
+
+    /**
+     * MRUnit for latest mapreduce (0.21 api)
+     * <p/>
+     * 1. Test to see if the bytes overwrite will affect
+     */
+
+    private Mapper<LongWritable, BytesWritable, Text, BytesWritable> mapper = 
new RdfFileInputToCloudbaseTool.OutSubjStmtMapper();
+    private Reducer<Text, BytesWritable, Text, Mutation> reducer = new 
RdfFileInputToCloudbaseTool.StatementToMutationReducer();
+    private MapReduceDriver<LongWritable, BytesWritable, Text, BytesWritable, 
Text, Mutation> driver;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        driver = new MapReduceDriver(mapper, reducer);
+        Configuration conf = new Configuration();
+        conf.set(RdfFileInputToCloudbaseTool.CB_TABLE_PROP, "table");
+        driver.setConfiguration(conf);
+    }
+
+    public void testNormalRun() throws Exception {
+        StatementImpl stmt1 = new 
StatementImpl(vf.createURI("urn:namespace#subject"), 
vf.createURI("urn:namespace#pred"), vf.createLiteral("object"));
+        StatementImpl stmt2 = new 
StatementImpl(vf.createURI("urn:namespace#subject"), 
vf.createURI("urn:namespace#pred"), vf.createLiteral("obje"));
+        StatementImpl stmt3 = new 
StatementImpl(vf.createURI("urn:namespace#subj2"), 
vf.createURI("urn:namespace#pred"), vf.createLiteral("ob"));
+        List<Pair<Text, Mutation>> pairs = driver.
+                withInput(new LongWritable(1), new 
BytesWritable(RdfIO.writeStatement(stmt1, true))).
+                withInput(new LongWritable(1), new 
BytesWritable(RdfIO.writeStatement(stmt2, true))).
+                withInput(new LongWritable(1), new 
BytesWritable(RdfIO.writeStatement(stmt3, true))).
+                run();
+
+        assertEquals(4, pairs.size());
+
+        ColumnUpdate update = pairs.get(0).getSecond().getUpdates().get(0);
+        assertEquals("event", new String(update.getColumnFamily()));
+        
assertEquals("\07urn:namespace#subj2\0\07urn:namespace#pred\0\u0009ob", new 
String(update.getColumnQualifier()));
+    }
+
+    public static void main(String[] args) {
+        try {
+            Connector connector = new ZooKeeperInstance("stratus", 
"stratus13:2181").getConnector("root", "password".getBytes());
+            Collection<Text> splits = 
connector.tableOperations().getSplits("partitionRdf", Integer.MAX_VALUE);
+            System.out.println(splits.size());
+            System.out.println(splits);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitionerTest.java
 
b/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitionerTest.java
new file mode 100644
index 0000000..bd63f6f
--- /dev/null
+++ 
b/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitionerTest.java
@@ -0,0 +1,20 @@
+package mvm.mmrts.rdf.partition.mr.fileinput.bulk;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Class EmbedKeyRangePartitionerTest
+ * Date: Sep 13, 2011
+ * Time: 1:58:28 PM
+ */
+public class EmbedKeyRangePartitionerTest extends TestCase {
+
+    public void testRetrieveEmbedKey() throws Exception {
+        assertEquals(new Text("hello"), 
EmbedKeyRangePartitioner.retrieveEmbedKey(new Text("hello\1there")));
+        assertEquals(new Text("h"), 
EmbedKeyRangePartitioner.retrieveEmbedKey(new Text("h\1there")));
+        assertEquals(new Text(""), 
EmbedKeyRangePartitioner.retrieveEmbedKey(new Text("\1there")));
+        assertEquals(new Text("hello there"), 
EmbedKeyRangePartitioner.retrieveEmbedKey(new Text("hello there")));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/pom.xml
----------------------------------------------------------------------
diff --git a/partition/partition.rdf/pom.xml b/partition/partition.rdf/pom.xml
new file mode 100644
index 0000000..2701d64
--- /dev/null
+++ b/partition/partition.rdf/pom.xml
@@ -0,0 +1,281 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <groupId>mvm.rya</groupId>
+        <artifactId>parent</artifactId>
+        <version>3.0.0.alpha1</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>mvm.mmrts.rdf</groupId>
+    <artifactId>partition.rdf</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+    <name>${project.groupId}.${project.artifactId}</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-runtime</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-queryresultio-sparqlxml</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-rio-rdfxml</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>sitestore.common</groupId>
+            <artifactId>common-query</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>mvm.rya</groupId>
+            <artifactId>cloudbase.utils</artifactId>
+        </dependency>
+
+        <!-- Cloudbase deps -->
+        <dependency>
+            <groupId>cloudbase</groupId>
+            <artifactId>cloudbase-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>zookeeper</artifactId>
+        </dependency>
+
+        <!-- Test -->
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Deps that are transitive but listed anyway
+
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-model</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-query</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-queryalgebra-model</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-queryparser-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-queryparser-serql</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-queryparser-sparql</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-queryparser-serql</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-queryresultio-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-queryresultio-binary</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-queryresultio-sparqljson</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-queryresultio-text</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-repository-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-repository-manager</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-repository-event</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-repository-sail</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-sail-memory</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-sail-inferencer</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-queryalgebra-evaluation</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-repository-http</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-http-client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-repository-contextaware</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-repository-dataset</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-http-protocol</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-rio-ntriples</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-rio-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-rio-n3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-rio-trix</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-rio-turtle</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-rio-trig</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-sail-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-sail-nativerdf</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-sail-rdbms</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>info.aduna.commons</groupId>
+            <artifactId>aduna-commons-collections</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>info.aduna.commons</groupId>
+            <artifactId>aduna-commons-iteration</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>info.aduna.commons</groupId>
+            <artifactId>aduna-commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>info.aduna.commons</groupId>
+            <artifactId>aduna-commons-lang</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>info.aduna.commons</groupId>
+            <artifactId>aduna-commons-i18n</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>info.aduna.commons</groupId>
+            <artifactId>aduna-commons-concurrent</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>info.aduna.commons</groupId>
+            <artifactId>aduna-commons-xml</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>info.aduna.commons</groupId>
+            <artifactId>aduna-commons-text</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>info.aduna.commons</groupId>
+            <artifactId>aduna-commons-net</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-dbcp</groupId>
+            <artifactId>commons-dbcp</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-pool</groupId>
+            <artifactId>commons-pool</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>  -->
+
+    </dependencies>
+    <repositories>
+        <repository>
+            <releases>
+                <enabled>true</enabled>
+            </releases>
+            <snapshots>
+                <enabled>false</enabled>
+            </snapshots>
+            <id>aduna-opensource.releases</id>
+            <name>Aduna Open Source - Maven releases</name>
+            <url>http://repo.aduna-software.org/maven2/releases</url>
+        </repository>
+    </repositories>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>1.6</source>
+                    <target>1.6</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <excludes>
+                        <exclude>**/*IntegrationTest.java
+                        </exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/InvalidValueTypeMarkerRuntimeException.java
----------------------------------------------------------------------
diff --git 
a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/InvalidValueTypeMarkerRuntimeException.java
 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/InvalidValueTypeMarkerRuntimeException.java
new file mode 100644
index 0000000..0c723a1
--- /dev/null
+++ 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/InvalidValueTypeMarkerRuntimeException.java
@@ -0,0 +1,34 @@
+package mvm.mmrts.rdf.partition;
+
+/**
+ * Class InvalidValueTypeMarkerRuntimeException
+ * Date: Jan 7, 2011
+ * Time: 12:58:27 PM
+ */
+public class InvalidValueTypeMarkerRuntimeException extends RuntimeException {
+    private int valueTypeMarker = -1;
+
+    public InvalidValueTypeMarkerRuntimeException(int valueTypeMarker) {
+        super();
+        this.valueTypeMarker = valueTypeMarker;
+    }
+
+    public InvalidValueTypeMarkerRuntimeException(int valueTypeMarker, String 
s) {
+        super(s);
+        this.valueTypeMarker = valueTypeMarker;
+    }
+
+    public InvalidValueTypeMarkerRuntimeException(int valueTypeMarker, String 
s, Throwable throwable) {
+        super(s, throwable);
+        this.valueTypeMarker = valueTypeMarker;
+    }
+
+    public InvalidValueTypeMarkerRuntimeException(int valueTypeMarker, 
Throwable throwable) {
+        super(throwable);
+        this.valueTypeMarker = valueTypeMarker;
+    }
+
+    public int getValueTypeMarker() {
+        return valueTypeMarker;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConnection.java
----------------------------------------------------------------------
diff --git 
a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConnection.java
 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConnection.java
new file mode 100644
index 0000000..83e0675
--- /dev/null
+++ 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConnection.java
@@ -0,0 +1,306 @@
+package mvm.mmrts.rdf.partition;
+
+import cloudbase.core.client.BatchWriter;
+import cloudbase.core.client.Connector;
+import cloudbase.core.client.Scanner;
+import cloudbase.core.client.admin.TableOperations;
+import cloudbase.core.data.Key;
+import cloudbase.core.data.Mutation;
+import cloudbase.core.data.Range;
+import cloudbase.core.security.ColumnVisibility;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import info.aduna.iteration.CloseableIteration;
+import mvm.mmrts.rdf.partition.converter.ContextColVisConverter;
+import mvm.mmrts.rdf.partition.iterators.NamespaceIterator;
+import mvm.mmrts.rdf.partition.query.evaluation.FilterTimeIndexVisitor;
+import mvm.mmrts.rdf.partition.query.evaluation.PartitionEvaluationStrategy;
+import mvm.mmrts.rdf.partition.query.evaluation.SubjectGroupingOptimizer;
+import mvm.mmrts.rdf.partition.shard.ShardValueGenerator;
+import mvm.mmrts.rdf.partition.utils.ContextsStatementImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.openrdf.model.*;
+import org.openrdf.query.Binding;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.Dataset;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.QueryRoot;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.impl.EmptyBindingSet;
+import org.openrdf.sail.SailException;
+import org.openrdf.sail.helpers.SailConnectionBase;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+
+import static mvm.mmrts.rdf.partition.PartitionConstants.*;
+import static mvm.mmrts.rdf.partition.utils.RdfIO.writeStatement;
+import static mvm.mmrts.rdf.partition.utils.RdfIO.writeValue;
+
+/**
+ * Class PartitionConnection
+ * Date: Jul 6, 2011
+ * Time: 4:40:49 PM
+ * <p/>
+ * Ingest:
+ * Triple ->
+ * - <subject> <shard>:
+ * - <shard> event:<subject>\0<predicate>\0<object>
+ * - <shard> index:<predicate>\1<object>\0
+ * <p/>
+ * Namespace ->
+ * - <prefix> ns:<namespace>
+ */
+public class PartitionConnection extends SailConnectionBase {
+
+    private PartitionSail sail;
+    private BatchWriter writer;
+    private BatchWriter shardTableWriter;   //MMRTS-148
+    
+    private Multimap<Resource, ContextsStatementImpl> statements = 
HashMultimap.create(10000, 10);
+
+
+    public PartitionConnection(PartitionSail sailBase) throws SailException {
+        super(sailBase);
+        this.sail = sailBase;
+        this.initialize();
+    }
+
+    protected void initialize() throws SailException {
+        try {
+            Connector connector = sail.connector;
+            String table = sail.table;
+            String shardTable = sail.shardTable;
+
+            //create these tables if they do not exist
+            TableOperations tableOperations = connector.tableOperations();
+            boolean tableExists = tableOperations.exists(table);
+            if (!tableExists)
+                tableOperations.create(table);
+
+            tableExists = tableOperations.exists(shardTable);
+            if(!tableExists)
+                tableOperations.create(shardTable);
+
+            writer = connector.createBatchWriter(table, 1000000l, 60000l, 10);
+            shardTableWriter = connector.createBatchWriter(shardTable, 
1000000l, 60000l, 10);
+        } catch (Exception e) {
+            throw new SailException(e);
+        }
+    }
+
+    @Override
+    protected void closeInternal() throws SailException {
+        try {
+            writer.close();
+            shardTableWriter.close();
+        } catch (Exception e) {
+            throw new SailException(e);
+        }
+    }
+
+    @Override
+    protected CloseableIteration<? extends BindingSet, 
QueryEvaluationException> evaluateInternal(TupleExpr tupleExpr, Dataset 
dataset, BindingSet bindingSet, boolean b) throws SailException {
+//        throw new UnsupportedOperationException("Query not supported");
+
+        if (!(tupleExpr instanceof QueryRoot))
+            tupleExpr = new QueryRoot(tupleExpr);
+
+        try {
+            Configuration queryConf = populateConf(bindingSet);
+            //timeRange filter check
+            tupleExpr.visit(new FilterTimeIndexVisitor(queryConf));
+
+            (new SubjectGroupingOptimizer(queryConf)).optimize(tupleExpr, 
dataset, bindingSet);
+            PartitionTripleSource source = new 
PartitionTripleSource(this.sail, queryConf);
+
+            PartitionEvaluationStrategy strategy = new 
PartitionEvaluationStrategy(
+                    source, dataset);
+
+            return strategy.evaluate(tupleExpr, EmptyBindingSet.getInstance());
+        } catch (Exception e) {
+            throw new SailException(e);
+        }
+
+    }
+
+    protected Configuration populateConf(BindingSet bs) {
+        Configuration conf = new Configuration(this.sail.conf);
+
+        for (String bname : bs.getBindingNames()) {
+            conf.set(bname, bs.getValue(bname).stringValue());
+        }
+        Binding start = bs.getBinding(START_BINDING);
+        if (start != null)
+            conf.setLong(START_BINDING, 
Long.parseLong(start.getValue().stringValue()));
+
+        Binding end = bs.getBinding(END_BINDING);
+        if (end != null)
+            conf.setLong(END_BINDING, 
Long.parseLong(end.getValue().stringValue()));
+
+        Binding timePredicate = bs.getBinding(TIME_PREDICATE);
+        if (timePredicate != null)
+            conf.set(TIME_PREDICATE, timePredicate.getValue().stringValue());
+
+        Binding timeType = bs.getBinding(TIME_TYPE_PROP);
+        if (timeType != null)
+            conf.set(TIME_TYPE_PROP, timeType.getValue().stringValue());
+        else if (timePredicate != null)
+            conf.set(TIME_TYPE_PROP, TimeType.XMLDATETIME.name()); //default 
to xml datetime
+
+        return conf;
+    }
+
+    @Override
+    protected CloseableIteration<? extends Resource, SailException> 
getContextIDsInternal() throws SailException {
+        throw new UnsupportedOperationException("Contexts not supported");
+    }
+
+    @Override
+    protected CloseableIteration<? extends Statement, SailException> 
getStatementsInternal(Resource resource, URI uri, Value value, boolean b, 
Resource... resources) throws SailException {
+        throw new UnsupportedOperationException("Query not supported");
+    }
+
+    @Override
+    protected long sizeInternal(Resource... resources) throws SailException {
+        throw new UnsupportedOperationException("Size operation not 
supported");
+    }
+
+    @Override
+    protected void startTransactionInternal() throws SailException {
+        // no transaction support as of yet
+    }
+
+    @Override
+    protected void commitInternal() throws SailException {
+        try {
+            ShardValueGenerator gen = sail.generator;
+            ContextColVisConverter contextColVisConverter = 
sail.contextColVisConverter;
+            Map<Resource, Collection<ContextsStatementImpl>> map = 
statements.asMap();
+            for (Map.Entry<Resource, Collection<ContextsStatementImpl>> entry 
: map.entrySet()) {
+                Resource subject = entry.getKey();
+                byte[] subj_bytes = writeValue(subject);
+                String shard = gen.generateShardValue(subject);
+                Text shard_txt = new Text(shard);
+                Collection<ContextsStatementImpl> stmts = entry.getValue();
+
+                /**
+                 * Triple - >
+                 *- < subject ><shard >:
+                 *- < shard > event:<subject >\0 < predicate >\0 < object >
+                 *- < shard > index:<predicate >\1 < object >\0
+                 */
+                Mutation m_subj = new Mutation(shard_txt);
+                for (ContextsStatementImpl stmt : stmts) {
+                    Resource[] contexts = stmt.getContexts();
+                    ColumnVisibility vis = null;
+                    if (contexts != null && contexts.length > 0 && 
contextColVisConverter != null) {
+                        vis = contextColVisConverter.convertContexts(contexts);
+                    }
+
+                    if (vis != null) {
+                        m_subj.put(DOC, new Text(writeStatement(stmt, true)), 
vis, EMPTY_VALUE);
+                        m_subj.put(INDEX, new Text(writeStatement(stmt, 
false)), vis, EMPTY_VALUE);
+                    } else {
+                        m_subj.put(DOC, new Text(writeStatement(stmt, true)), 
EMPTY_VALUE);
+                        m_subj.put(INDEX, new Text(writeStatement(stmt, 
false)), EMPTY_VALUE);
+                    }
+                }
+
+                /**
+                 * TODO: Is this right?
+                 * If the subject does not have any authorizations specified, 
then anyone can access it.
+                 * But the true authorization check will happen at the 
predicate/object level, which means that
+                 * the set returned will only be what the person is authorized 
to see.  The shard lookup table has to
+                 * have the lowest level authorization all the 
predicate/object authorizations; otherwise,
+                 * a user may not be able to see the correct document.   
+                 */
+                Mutation m_shard = new Mutation(new Text(subj_bytes));
+                m_shard.put(shard_txt, EMPTY_TXT, EMPTY_VALUE);
+                shardTableWriter.addMutation(m_shard);
+
+                writer.addMutation(m_subj);
+            }
+
+            writer.flush();
+            shardTableWriter.flush();
+            statements.clear();
+        } catch (Exception e) {
+            throw new SailException(e);
+        }
+        finally {
+        }
+    }
+
+    @Override
+    protected void rollbackInternal() throws SailException {
+        statements.clear();
+    }
+
+    @Override
+    protected void addStatementInternal(Resource subject, URI predicate, Value 
object, Resource... contexts) throws SailException {
+        statements.put(subject, new ContextsStatementImpl(subject, predicate, 
object, contexts));
+    }
+
+    @Override
+    protected void removeStatementsInternal(Resource resource, URI uri, Value 
value, Resource... contexts) throws SailException {
+        throw new UnsupportedOperationException("Remove not supported as of 
yet");
+    }
+
+    @Override
+    protected void clearInternal(Resource... resources) throws SailException {
+        throw new UnsupportedOperationException("Clear with context not 
supported as of yet");
+    }
+
+    @Override
+    protected CloseableIteration<? extends Namespace, SailException> 
getNamespacesInternal() throws SailException {
+        return new NamespaceIterator(sail.connector, sail.table);
+    }
+
+    @Override
+    protected String getNamespaceInternal(String prefix) throws SailException {
+        try {
+            Scanner scanner = sail.connector.createScanner(sail.table, 
ALL_AUTHORIZATIONS);
+            scanner.setRange(new Range(new Text(prefix)));
+            scanner.fetchColumnFamily(NAMESPACE);
+            Iterator<Map.Entry<Key, cloudbase.core.data.Value>> iter = 
scanner.iterator();
+            if (iter != null && iter.hasNext())
+                return iter.next().getKey().getColumnQualifier().toString();
+        } catch (Exception e) {
+            throw new SailException(e);
+        }
+        return null;
+    }
+
+    @Override
+    protected void setNamespaceInternal(String prefix, String namespace) 
throws SailException {
+        /**
+         * Namespace ->
+         * - <prefix> <namespace>:
+         */
+
+        try {
+            Mutation m = new Mutation(new Text(prefix));
+            m.put(NAMESPACE, new Text(namespace), EMPTY_VALUE);
+            writer.addMutation(m);
+        } catch (Exception e) {
+            throw new SailException(e);
+        }
+    }
+
+    @Override
+    protected void removeNamespaceInternal
+            (String
+                    s) throws SailException {
+        throw new UnsupportedOperationException("Namespace remove not 
supported");
+    }
+
+    @Override
+    protected void clearNamespacesInternal
+            () throws SailException {
+        throw new UnsupportedOperationException("Namespace Clear not 
supported");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConstants.java
----------------------------------------------------------------------
diff --git 
a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConstants.java
 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConstants.java
new file mode 100644
index 0000000..cb69596
--- /dev/null
+++ 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConstants.java
@@ -0,0 +1,141 @@
+package mvm.mmrts.rdf.partition;
+
+import cloudbase.core.CBConstants;
+import cloudbase.core.data.Value;
+import cloudbase.core.security.Authorizations;
+import mvm.mmrts.rdf.partition.query.operators.ShardSubjectLookup;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.openrdf.model.Literal;
+import org.openrdf.model.URI;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ * Class PartitionConstants
+ * Date: Jul 6, 2011
+ * Time: 12:22:55 PM
+ */
+public class PartitionConstants {
+
+    public static final String PARTITION_NS = 
"urn:mvm.mmrts.partition.rdf/08/2011#";
+    public static ValueFactory VALUE_FACTORY = ValueFactoryImpl.getInstance();
+    public static URI TIMERANGE = VALUE_FACTORY.createURI(PARTITION_NS, 
"timeRange");
+    public static URI SHARDRANGE = VALUE_FACTORY.createURI(PARTITION_NS, 
"shardRange"); //shardRange(subject, start, stop) in ms
+    public static Literal EMPTY_LITERAL = VALUE_FACTORY.createLiteral(0);
+
+    public static final byte FAMILY_DELIM = 0;
+    public static final String FAMILY_DELIM_STR = "\0";
+    public static final byte INDEX_DELIM = 1;
+    public static final String INDEX_DELIM_STR = "\1";
+
+    /* RECORD TYPES */
+//    public static final int NAMESPACE_MARKER = 2;
+//
+//    public static final int EXPL_TRIPLE_MARKER = 3;
+//
+//    public static final int EXPL_QUAD_MARKER = 4;
+//
+//    public static final int INF_TRIPLE_MARKER = 5;
+//
+//    public static final int INF_QUAD_MARKER = 6;
+
+    public static final int URI_MARKER = 7;
+
+    public static final String URI_MARKER_STR = "\07";
+
+    public static final int BNODE_MARKER = 8;
+
+    public static final int PLAIN_LITERAL_MARKER = 9;
+
+    public static final String PLAIN_LITERAL_MARKER_STR = "\u0009";
+
+    public static final int LANG_LITERAL_MARKER = 10;
+
+    public static final int DATATYPE_LITERAL_MARKER = 11;
+
+    public static final String DATATYPE_LITERAL_MARKER_STR = "\u000B";
+
+    public static final int EOF_MARKER = 127;
+
+    // public static final Authorizations ALL_AUTHORIZATIONS = new 
Authorizations(
+    // "_");
+    public static final Authorizations ALL_AUTHORIZATIONS = 
CBConstants.NO_AUTHS;
+
+    public static final Value EMPTY_VALUE = new Value(new byte[0]);
+    public static final Text EMPTY_TXT = new Text("");
+
+    /* Column Families and Qualifiers */
+    public static final Text INDEX = new Text("index");
+    public static final Text DOC = new Text("event");
+    public static final Text NAMESPACE = new Text("ns");
+
+    /* Time constants */
+    public static final String START_BINDING = "binding.start";
+    public static final String END_BINDING = "binding.end";
+    public static final String TIME_PREDICATE = "binding.timePredicate";
+    public static final String SHARDRANGE_BINDING = "binding.shardRange";
+    public static final String SHARDRANGE_START = "binding.shardRange.start";
+    public static final String SHARDRANGE_END = "binding.shardRange.end";
+    public static final String TIME_TYPE_PROP = "binding.timeProp";
+    public static final String AUTHORIZATION_PROP = "binding.authorization";
+    public static final String NUMTHREADS_PROP = "binding.numthreads";
+    public static final String ALLSHARDS_PROP = "binding.allshards";
+
+    public static final String VALUE_DELIMITER = "\03";
+
+    public static final SimpleDateFormat XMLDATE = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+
+    public enum TimeType {
+        TIMESTAMP, XMLDATETIME
+    }
+
+    public static boolean isTimeRange(ShardSubjectLookup lookup, Configuration 
configuration) {
+        return (configuration.get(TIME_PREDICATE) != null) || 
(lookup.getTimePredicate() != null);
+    }
+
+    public static Long validateFillStartTime(Long start, ShardSubjectLookup 
lookup) {
+        if (lookup.getShardStartTimeRange() != null)
+            return Long.parseLong(lookup.getShardEndTimeRange());
+        return (start == null) ? 0l : start;
+    }
+
+    public static Long validateFillEndTime(Long end, ShardSubjectLookup 
lookup) {
+        if (lookup.getShardEndTimeRange() != null)
+            return Long.parseLong(lookup.getShardEndTimeRange());
+        return (end == null) ? System.currentTimeMillis() : end;
+    }
+
+    public static String getStartTimeRange(ShardSubjectLookup lookup, 
Configuration configuration) {
+        String tp = configProperty(configuration, TIME_PREDICATE, 
lookup.getTimePredicate());
+        String st = configProperty(configuration, START_BINDING, 
lookup.getStartTimeRange());
+        TimeType tt = lookup.getTimeType();
+        if (tt == null)
+            tt = TimeType.valueOf(configuration.get(TIME_TYPE_PROP));
+        return URI_MARKER_STR + tp + INDEX_DELIM_STR + 
convertTime(Long.parseLong(st), tt);
+    }
+
+    public static String getEndTimeRange(ShardSubjectLookup lookup, 
Configuration configuration) {
+        String tp = configProperty(configuration, TIME_PREDICATE, 
lookup.getTimePredicate());
+        String et = configProperty(configuration, END_BINDING, 
lookup.getEndTimeRange());
+        TimeType tt = lookup.getTimeType();
+        if (tt == null)
+            tt = TimeType.valueOf(configuration.get(TIME_TYPE_PROP));
+        return URI_MARKER_STR + tp + INDEX_DELIM_STR + 
convertTime(Long.parseLong(et), tt);
+    }
+
+    public static String convertTime(Long timestamp, TimeType timeType) {
+        return (TimeType.XMLDATETIME.equals(timeType))
+                ? (DATATYPE_LITERAL_MARKER_STR + XMLDATE.format(new 
Date(timestamp)))
+                : PLAIN_LITERAL_MARKER_STR + timestamp;
+    }
+
+    public static String configProperty(Configuration configuration, String 
property, String checkValue) {
+        if (checkValue == null)
+            return configuration.get(property);
+        return checkValue;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionSail.java
----------------------------------------------------------------------
diff --git 
a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionSail.java
 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionSail.java
new file mode 100644
index 0000000..07eb411
--- /dev/null
+++ 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionSail.java
@@ -0,0 +1,122 @@
+package mvm.mmrts.rdf.partition;
+
+import cloudbase.core.client.CBException;
+import cloudbase.core.client.CBSecurityException;
+import cloudbase.core.client.Connector;
+import cloudbase.core.client.ZooKeeperInstance;
+import mvm.mmrts.rdf.partition.converter.ContextColVisConverter;
+import mvm.mmrts.rdf.partition.shard.DateHashModShardValueGenerator;
+import mvm.mmrts.rdf.partition.shard.ShardValueGenerator;
+import org.apache.hadoop.conf.Configuration;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.sail.SailConnection;
+import org.openrdf.sail.SailException;
+import org.openrdf.sail.helpers.SailBase;
+
+/**
+ * Class PartitionSail
+ * Date: Jul 6, 2011
+ * Time: 11:40:52 AM
+ */
+public class PartitionSail extends SailBase {
+
+    protected Connector connector;
+
+    protected String table;
+    //MMRTS-148
+    protected String shardTable;
+
+    protected ShardValueGenerator generator = new 
DateHashModShardValueGenerator();
+
+    protected Configuration conf = new Configuration();
+
+    protected ContextColVisConverter contextColVisConverter;
+
+    public PartitionSail(Connector connector, String table) {
+        this(connector, table, table, null);
+    }
+
+    public PartitionSail(Connector connector, String table, String shardTable) 
{
+        this(connector, table, shardTable, null);
+    }
+
+    public PartitionSail(String instance, String zk, String user, String 
password, String table)
+            throws CBSecurityException, CBException {
+        this(instance, zk, user, password, table, (ShardValueGenerator) null);
+    }
+
+    public PartitionSail(String instance, String zk, String user, String 
password, String table, ShardValueGenerator generator)
+            throws CBSecurityException, CBException {
+        this(new ZooKeeperInstance(instance, zk).getConnector(user, 
password.getBytes()), table, table, generator);
+    }
+
+    public PartitionSail(String instance, String zk, String user, String 
password, String table, String shardTable)
+            throws CBSecurityException, CBException {
+        this(instance, zk, user, password, table, shardTable, null);
+    }
+
+    public PartitionSail(String instance, String zk, String user, String 
password, String table, String shardTable, ShardValueGenerator generator)
+            throws CBSecurityException, CBException {
+        this(new ZooKeeperInstance(instance, zk).getConnector(user, 
password.getBytes()), table, shardTable, generator);
+    }
+
+    public PartitionSail(Connector connector, String table, 
ShardValueGenerator generator) {
+        this(connector, table, table, generator);
+    }
+
+    public PartitionSail(Connector connector, String table, String shardTable, 
ShardValueGenerator generator) {
+        this.connector = connector;
+        this.table = table;
+        this.shardTable = shardTable;
+        if (generator != null)
+            this.generator = generator;
+    }
+
+    @Override
+    protected void shutDownInternal() throws SailException {
+    }
+
+    @Override
+    protected SailConnection getConnectionInternal() throws SailException {
+        return new PartitionConnection(this);
+    }
+
+    @Override
+    public boolean isWritable() throws SailException {
+        return true;
+    }
+
+    @Override
+    public ValueFactory getValueFactory() {
+        return ValueFactoryImpl.getInstance();
+    }
+
+    public Configuration getConf() {
+        return conf;
+    }
+
+    public Connector getConnector() {
+        return connector;
+    }
+
+    public ShardValueGenerator getGenerator() {
+        return generator;
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public String getShardTable() {
+        return shardTable;
+    }
+
+    public ContextColVisConverter getContextColVisConverter() {
+        return contextColVisConverter;
+    }
+
+    public void setContextColVisConverter(ContextColVisConverter 
contextColVisConverter) {
+        this.contextColVisConverter = contextColVisConverter;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionTripleSource.java
----------------------------------------------------------------------
diff --git 
a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionTripleSource.java
 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionTripleSource.java
new file mode 100644
index 0000000..ca7772b
--- /dev/null
+++ 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionTripleSource.java
@@ -0,0 +1,40 @@
+package mvm.mmrts.rdf.partition;
+
+import info.aduna.iteration.CloseableIteration;
+import 
mvm.mmrts.rdf.partition.query.evaluation.ShardSubjectLookupStatementIterator;
+import mvm.mmrts.rdf.partition.query.operators.ShardSubjectLookup;
+import org.apache.hadoop.conf.Configuration;
+import org.openrdf.model.*;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.evaluation.TripleSource;
+
+/**
+ * Class PartitionTripleSource
+ * Date: Jul 18, 2011
+ * Time: 10:45:06 AM
+ */
+public class PartitionTripleSource implements TripleSource {
+    private PartitionSail sail;
+    private Configuration configuration;
+
+    public PartitionTripleSource(PartitionSail sail, Configuration 
configuration) {
+        this.sail = sail;
+        this.configuration = configuration;
+    }
+
+    @Override
+    public CloseableIteration<? extends Statement, QueryEvaluationException> 
getStatements(Resource resource, URI uri, Value value, Resource... resources) 
throws QueryEvaluationException {
+        return null;  
+    }
+
+    public CloseableIteration<BindingSet, QueryEvaluationException> 
getStatements(ShardSubjectLookup lookup,
+                                                                               
            BindingSet bindings, Resource... contexts) throws 
QueryEvaluationException {
+        return new ShardSubjectLookupStatementIterator(sail, lookup, bindings, 
configuration);
+    }
+
+    @Override
+    public ValueFactory getValueFactory() {
+        return PartitionConstants.VALUE_FACTORY;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/converter/ContextColVisConverter.java
----------------------------------------------------------------------
diff --git 
a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/converter/ContextColVisConverter.java
 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/converter/ContextColVisConverter.java
new file mode 100644
index 0000000..f462e9a
--- /dev/null
+++ 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/converter/ContextColVisConverter.java
@@ -0,0 +1,14 @@
+package mvm.mmrts.rdf.partition.converter;
+
+import cloudbase.core.security.ColumnVisibility;
+import org.openrdf.model.Resource;
+
+/**
+ * Interface ContextColVisConverter
+ * Date: Aug 5, 2011
+ * Time: 7:35:40 AM
+ */
+public interface ContextColVisConverter {
+
+    public ColumnVisibility convertContexts(Resource... contexts);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/iterators/NamespaceIterator.java
----------------------------------------------------------------------
diff --git 
a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/iterators/NamespaceIterator.java
 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/iterators/NamespaceIterator.java
new file mode 100644
index 0000000..fc007e9
--- /dev/null
+++ 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/iterators/NamespaceIterator.java
@@ -0,0 +1,93 @@
+package mvm.mmrts.rdf.partition.iterators;
+
+import cloudbase.core.client.Connector;
+import cloudbase.core.client.Scanner;
+import cloudbase.core.client.TableNotFoundException;
+import cloudbase.core.data.Key;
+import cloudbase.core.data.Value;
+import info.aduna.iteration.CloseableIteration;
+import org.openrdf.model.Namespace;
+import org.openrdf.model.impl.NamespaceImpl;
+import org.openrdf.sail.SailException;
+
+import java.io.IOError;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import static mvm.mmrts.rdf.partition.PartitionConstants.*;
+
+//TODO: Combine with CloudbaseStoreContextTableIterator4
+public class NamespaceIterator implements
+        CloseableIteration<Namespace, SailException> {
+
+    private boolean open = false;
+    private Iterator<Entry<Key, Value>> result;
+
+    public NamespaceIterator(Connector connector, String table) throws 
SailException {
+        initialize(connector, table);
+        open = true;
+    }
+
+    protected void initialize(Connector connector, String table) throws 
SailException {
+        try {
+            Scanner scanner = connector.createScanner(table,
+                    ALL_AUTHORIZATIONS);
+            scanner.fetchColumnFamily(NAMESPACE);
+            result = scanner.iterator();
+        } catch (TableNotFoundException e) {
+            throw new SailException("Exception occurred in Namespace Iterator",
+                    e);
+        }
+    }
+
+    @Override
+    public void close() throws SailException {
+        try {
+            verifyIsOpen();
+            open = false;
+        } catch (IOError e) {
+            throw new SailException(e);
+        }
+    }
+
+    public void verifyIsOpen() throws SailException {
+        if (!open) {
+            throw new SailException("Iterator not open");
+        }
+    }
+
+    @Override
+    public boolean hasNext() throws SailException {
+        verifyIsOpen();
+        return result != null && result.hasNext();
+    }
+
+    @Override
+    public Namespace next() throws SailException {
+        if (hasNext()) {
+            Namespace namespace = getNamespace(result);
+            return namespace;
+        }
+        return null;
+    }
+
+    public static Namespace getNamespace(Iterator<Entry<Key, Value>> 
rowResults) {
+        for (; rowResults.hasNext();) {
+            Entry<Key, Value> next = rowResults.next();
+            Key key = next.getKey();
+            String cq = key.getColumnQualifier().toString();
+            return new NamespaceImpl(key.getRow().toString(), cq.toString());
+        }
+
+        return null;
+    }
+
+    @Override
+    public void remove() throws SailException {
+        next();
+    }
+
+    public boolean isOpen() {
+        return open;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/FilterTimeIndexVisitor.java
----------------------------------------------------------------------
diff --git 
a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/FilterTimeIndexVisitor.java
 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/FilterTimeIndexVisitor.java
new file mode 100644
index 0000000..5964ea0
--- /dev/null
+++ 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/FilterTimeIndexVisitor.java
@@ -0,0 +1,113 @@
+package mvm.mmrts.rdf.partition.query.evaluation;
+
+import org.apache.hadoop.conf.Configuration;
+import org.openrdf.model.impl.BooleanLiteralImpl;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.*;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+
+import java.util.List;
+
+import static mvm.mmrts.rdf.partition.PartitionConstants.*;
+
+/**
+ * Class FilterTimeIndexVisitor
+ * Date: Apr 11, 2011
+ * Time: 10:16:15 PM
+ */
+public class FilterTimeIndexVisitor extends QueryModelVisitorBase {
+
+    private Configuration conf;
+
+    public FilterTimeIndexVisitor(Configuration conf) {
+        this.conf = conf;
+    }
+
+    @Override
+    public void meet(Filter node) throws Exception {
+        super.meet(node);
+
+        ValueExpr arg = node.getCondition();
+        if (arg instanceof FunctionCall) {
+            FunctionCall fc = (FunctionCall) arg;
+            if (SHARDRANGE.stringValue().equals(fc.getURI())) {
+                List<ValueExpr> valueExprs = fc.getArgs();
+                if (valueExprs.size() != 3) {
+                    throw new QueryEvaluationException("mvm:shardRange must 
have 3 parameters: subject to run time index on, startTime(ms), endTime(ms)");
+                }
+                ValueExpr subj = valueExprs.get(0);
+                String subj_s = null;
+                if (subj instanceof Var) {
+                    subj_s = ((Var) subj).getName();
+                } else if (subj instanceof ValueConstant) {
+                    subj_s = ((ValueConstant) subj).getValue().stringValue();
+                }
+                if (subj_s == null)
+                    return; //no changes, need to figure out what shard lookup 
to add this time predicate to
+
+                String startTime = ((ValueConstant) 
valueExprs.get(1)).getValue().stringValue();
+                String endTime = ((ValueConstant) 
valueExprs.get(2)).getValue().stringValue();
+
+                this.conf.set(subj_s + "." + SHARDRANGE_BINDING, "true");
+                this.conf.set(subj_s + "." + SHARDRANGE_START, startTime);
+                this.conf.set(subj_s + "." + SHARDRANGE_END, endTime);
+
+                node.setCondition(new ValueConstant(BooleanLiteralImpl.TRUE));
+            }
+            if (TIMERANGE.stringValue().equals(fc.getURI())) {
+                List<ValueExpr> valueExprs = fc.getArgs();
+                if (valueExprs.size() != 4 && valueExprs.size() != 5) {
+                    throw new QueryEvaluationException("mvm:timeRange must 
have 4/5 parameters: subject to run time index on, time uri to index, 
startTime, endTime, time type(XMLDATETIME, TIMESTAMP)");
+                }
+
+                ValueExpr subj = valueExprs.get(0);
+                String subj_s = null;
+                if (subj instanceof Var) {
+                    subj_s = ((Var) subj).getName();
+                } else if (subj instanceof ValueConstant) {
+                    subj_s = ((ValueConstant) subj).getValue().stringValue();
+                }
+                if (subj_s == null)
+                    return; //no changes, need to figure out what shard lookup 
to add this time predicate to
+
+                ValueConstant timeUri_s = (ValueConstant) valueExprs.get(1);
+                URIImpl timeUri = new 
URIImpl(timeUri_s.getValue().stringValue());
+                String startTime = ((ValueConstant) 
valueExprs.get(2)).getValue().stringValue();
+                String endTime = ((ValueConstant) 
valueExprs.get(3)).getValue().stringValue();
+                TimeType timeType = TimeType.XMLDATETIME;
+                if (valueExprs.size() > 4)
+                    timeType = TimeType.valueOf(((ValueConstant) 
valueExprs.get(4)).getValue().stringValue());
+
+
+                this.conf.set(subj_s + "." + TIME_PREDICATE, 
timeUri.stringValue());
+                this.conf.set(subj_s + "." + START_BINDING, startTime);
+                this.conf.set(subj_s + "." + END_BINDING, endTime);
+                this.conf.set(subj_s + "." + TIME_TYPE_PROP, timeType.name());
+
+                //not setting global times
+                //set global start-end times
+//                String startTime_global = conf.get(START_BINDING);
+//                String endTime_global = conf.get(END_BINDING);
+//                if (startTime_global != null) {
+//                    long startTime_l = Long.parseLong(startTime);
+//                    long startTime_lg = Long.parseLong(startTime_global);
+//                    if (startTime_l < startTime_lg)
+//                        conf.set(START_BINDING, startTime);
+//                } else
+//                    conf.set(START_BINDING, startTime);
+//
+//                if (endTime_global != null) {
+//                    long endTime_l = Long.parseLong(endTime);
+//                    long endTime_lg = Long.parseLong(endTime_global);
+//                    if (endTime_l > endTime_lg)
+//                        conf.set(END_BINDING, endTime);
+//                } else
+//                    conf.set(END_BINDING, endTime);
+
+                node.setCondition(new ValueConstant(BooleanLiteralImpl.TRUE));
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/PartitionEvaluationStrategy.java
----------------------------------------------------------------------
diff --git 
a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/PartitionEvaluationStrategy.java
 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/PartitionEvaluationStrategy.java
new file mode 100644
index 0000000..bf898ff
--- /dev/null
+++ 
b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/PartitionEvaluationStrategy.java
@@ -0,0 +1,70 @@
+package mvm.mmrts.rdf.partition.query.evaluation;
+
+import cloudbase.core.client.Connector;
+import info.aduna.iteration.CloseableIteration;
+import mvm.mmrts.rdf.partition.PartitionTripleSource;
+import mvm.mmrts.rdf.partition.query.operators.ShardSubjectLookup;
+import org.openrdf.model.Resource;
+import org.openrdf.model.Statement;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.Dataset;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.QueryRoot;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.Var;
+import org.openrdf.query.algebra.evaluation.TripleSource;
+import org.openrdf.query.algebra.evaluation.impl.EvaluationStrategyImpl;
+
+import java.util.Map;
+
+/**
+ * Class PartitionEvaluationStrategy
+ * Date: Jul 14, 2011
+ * Time: 4:10:03 PM
+ */
+public class PartitionEvaluationStrategy extends EvaluationStrategyImpl {
+
+    public PartitionEvaluationStrategy(PartitionTripleSource tripleSource, 
Dataset dataset) {
+        super(tripleSource, dataset);
+    }
+
+    @Override
+    public CloseableIteration<BindingSet, QueryEvaluationException> 
evaluate(TupleExpr expr, BindingSet bindings) throws QueryEvaluationException {
+        if (expr instanceof QueryRoot) {
+            System.out.println(expr);
+        } else if (expr instanceof ShardSubjectLookup) {
+            return this.evaluate((ShardSubjectLookup) expr, bindings);
+        }
+        return super.evaluate(expr, bindings);
+    }
+
+    public CloseableIteration<BindingSet, QueryEvaluationException> 
evaluate(ShardSubjectLookup lookup, BindingSet bindings) throws 
QueryEvaluationException {
+        if (bindings.size() > 0) {
+            Var subjVar = lookup.getSubject();
+            if(bindings.hasBinding(subjVar.getName())){
+                subjVar.setValue(bindings.getValue(subjVar.getName()));
+            }
+            //populate the lookup
+            for (Map.Entry<Var, Var> predObj : 
lookup.getPredicateObjectPairs()) {
+                Var predVar = predObj.getKey();
+                Var objVar = predObj.getValue();
+
+                if(bindings.hasBinding(predVar.getName())) {
+                    predVar.setValue(bindings.getValue(predVar.getName()));
+                }
+                if(bindings.hasBinding(objVar.getName())) {
+                    objVar.setValue(bindings.getValue(objVar.getName()));
+                }
+            }
+        }
+        return ((PartitionTripleSource) tripleSource).getStatements(lookup, 
bindings, new Resource[0]);
+    }
+
+    @Override
+    public CloseableIteration<BindingSet, QueryEvaluationException> 
evaluate(StatementPattern sp, BindingSet bindings) throws 
QueryEvaluationException {
+        ShardSubjectLookup lookup = new ShardSubjectLookup(sp.getSubjectVar());
+        lookup.addPredicateObjectPair(sp.getPredicateVar(), sp.getObjectVar());
+        return this.evaluate((ShardSubjectLookup) lookup, bindings);
+    }
+}

Reply via email to