http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlTestDriver.java
----------------------------------------------------------------------
diff --git 
a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlTestDriver.java
 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlTestDriver.java
new file mode 100644
index 0000000..4b369ae
--- /dev/null
+++ 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlTestDriver.java
@@ -0,0 +1,155 @@
+package mvm.mmrts.rdf.partition.mr;
+
+import com.google.common.io.ByteStreams;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Date;
+
+/**
+ * Class SparqlTestDriver
+ * Date: Oct 28, 2010
+ * Time: 2:53:39 PM
+ */
+public class SparqlTestDriver implements Tool {
+
+    public static void main(String[] args) {
+        try {
+            ToolRunner.run(new Configuration(), new SparqlTestDriver(), args);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    private Configuration conf;
+
+    public Configuration getConf() {
+        return conf;
+    }
+
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+    }
+
+    public int run(String[] args) throws IOException, InterruptedException,
+            ClassNotFoundException {
+
+        //query from file
+        if(args.length < 2) {
+            throw new IllegalArgumentException("Usage: hadoop jar 
mvm.mmrts.rdf.partition.mr.SparqlTestDriver <local query file> outputFile");
+        }
+
+        FileInputStream fis = new FileInputStream(args[0]);
+        String query = new String(ByteStreams.toByteArray(fis));
+        fis.close();
+
+        Job job = new Job(conf);
+        job.setJarByClass(SparqlTestDriver.class);
+
+        // set up cloudbase input
+        job.setInputFormatClass(SparqlPartitionStoreInputFormat.class);
+        SparqlPartitionStoreInputFormat.setInputInfo(job, "root", 
"password".getBytes());
+        SparqlPartitionStoreInputFormat.setZooKeeperInstance(job, "stratus", 
"10.40.190.113:2181");
+        SparqlPartitionStoreInputFormat.setLongJob(job, null);
+        SparqlPartitionStoreInputFormat.setTable(job, "partitionRdf");
+
+        long startTime_l = 1303811164088l;
+        long ttl = 86400000;
+
+        //set query
+//        String query = "PREFIX tdp: 
<http://here/2010/tracked-data-provenance/ns#>\n" +
+//                "PREFIX rdf: 
<http://www.w3.org/1999/02/22-rdf-syntax-ns#>\n" +
+//                "PREFIX mvmpart: <urn:mvm.mmrts.partition.rdf/08/2011#>\n" +
+//                "SELECT * WHERE\n" +
+//                "{\n" +
+//                "?id tdp:reportedAt ?timestamp. \n" +
+//                "FILTER(mvmpart:timeRange(?id, tdp:reportedAt, 1314380456900 
, 1314384056900 , 'XMLDATETIME')).\n" +
+//                "?id tdp:performedBy ?system.\n" +
+//                "} \n";
+//
+//        String query2 = "PREFIX hb: 
<http://here/2010/tracked-data-provenance/heartbeat/ns#>\n" +
+//                "PREFIX rdf: 
<http://www.w3.org/1999/02/22-rdf-syntax-ns#>\n" +
+//                "PREFIX mvmpart: <urn:mvm.mmrts.partition.rdf/08/2011#>\n" +
+//                "SELECT * WHERE\n" +
+//                "{\n" +
+//                "?id hb:timeStamp ?timestamp. \n" +
+//                "FILTER(mvmpart:timeRange(?id, hb:timeStamp, 1314360009522 , 
1314367209522 , 'TIMESTAMP')).\n" +
+//                "?id hb:count ?count.\n" +
+//                "?id hb:systemName ?systemName.\n" +
+//                "} ";
+
+        System.out.println(query);
+        System.out.println();
+//        System.out.println(query2);
+
+        SparqlPartitionStoreInputFormat.setSparqlQueries(job, query);
+//        SparqlCloudbaseStoreInputFormat.setStartTime(job, 1309956861000l + 
"");
+//        SparqlCloudbaseStoreInputFormat.setTtl(job, 86400000 + "");
+
+        // set input output of the particular job
+        job.setMapOutputKeyClass(Text.class);
+        job.setMapOutputValueClass(Text.class);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(Text.class);
+
+        //job.setOutputFormatClass(FileOutputFormat.class);
+
+
+        // set mapper and reducer classes
+        job.setMapperClass(MyTempMapper.class);
+        job.setReducerClass(Reducer.class);
+        job.setNumReduceTasks(1);
+
+        // set output
+        Path outputDir = new Path(args[1]);
+        FileSystem dfs = FileSystem.get(outputDir.toUri(), conf);
+        if (dfs.exists(outputDir))
+            dfs.delete(outputDir, true);
+
+        FileOutputFormat.setOutputPath(job, outputDir);
+
+        // Submit the job
+        Date startTime = new Date();
+        System.out.println("Job started: " + startTime);
+        int exitCode = job.waitForCompletion(true) ? 0 : 1;
+
+        if (exitCode == 0) {
+            Date end_time = new Date();
+            System.out.println("Job ended: " + end_time);
+            System.out.println("The job took "
+                    + (end_time.getTime() - startTime.getTime()) / 1000
+                    + " seconds.");
+            return (int) job
+                    .getCounters()
+                    .findCounter("org.apache.hadoop.mapred.Task$Counter",
+                            "REDUCE_OUTPUT_RECORDS").getValue();
+        } else {
+            System.out.println("Job Failed!!!");
+        }
+
+        return -1;
+    }
+
+    public static class MyTempMapper extends Mapper<LongWritable, MapWritable, 
Text, Text> {
+        Text outKey = new Text();
+        Text outValue = new Text("partition");
+        @Override
+        protected void map(LongWritable key, MapWritable value, Context 
context) throws IOException, InterruptedException {
+            outKey.set(value.values().toString());
+            context.write(outKey, outValue);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/TestDriver.java
----------------------------------------------------------------------
diff --git 
a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/TestDriver.java
 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/TestDriver.java
new file mode 100644
index 0000000..80255ba
--- /dev/null
+++ 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/TestDriver.java
@@ -0,0 +1,154 @@
+package mvm.mmrts.rdf.partition.mr;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.primitives.Bytes;
+import mvm.mmrts.rdf.partition.PartitionConstants;
+import mvm.mmrts.rdf.partition.utils.RdfIO;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.openrdf.model.Statement;
+
+import java.io.IOException;
+import java.util.Date;
+
+/**
+ * Class SparqlTestDriver
+ * Date: Oct 28, 2010
+ * Time: 2:53:39 PM
+ */
+public class TestDriver implements Tool {
+
+    public static void main(String[] args) {
+        try {
+            ToolRunner.run(new Configuration(), new TestDriver(), args);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    private Configuration conf;
+
+    public Configuration getConf() {
+        return conf;
+    }
+
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+    }
+
+    public int run(String[] args) throws IOException, InterruptedException,
+            ClassNotFoundException {
+
+        Job job = new Job(conf);
+        job.setJarByClass(TestDriver.class);
+
+        FileInputFormat.addInputPaths(job, "/temp/rpunnoose/results.txt");
+        job.setInputFormatClass(TextInputFormat.class);
+
+        // set input output of the particular job
+        job.setMapOutputKeyClass(Text.class);
+        job.setMapOutputValueClass(MapWritable.class);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(Text.class);
+
+        job.setOutputFormatClass(TextOutputFormat.class);
+
+        // set mapper and reducer classes
+        job.setMapperClass(SubjectMapWrMapper.class);
+        job.setReducerClass(OutMapWrReducer.class);
+        job.setNumReduceTasks(1);
+//        job.setNumReduceTasks(0);
+
+        // set output
+        Path outputDir = new Path("/temp/rpunnoose/partBS");
+        FileSystem dfs = FileSystem.get(outputDir.toUri(), conf);
+        if (dfs.exists(outputDir))
+            dfs.delete(outputDir, true);
+
+        FileOutputFormat.setOutputPath(job, outputDir);
+
+        // Submit the job
+        Date startTime = new Date();
+        System.out.println("Job started: " + startTime);
+        int exitCode = job.waitForCompletion(true) ? 0 : 1;
+
+        if (exitCode == 0) {
+            Date end_time = new Date();
+            System.out.println("Job ended: " + end_time);
+            System.out.println("The job took "
+                    + (end_time.getTime() - startTime.getTime()) / 1000
+                    + " seconds.");
+            return (int) job
+                    .getCounters()
+                    .findCounter("org.apache.hadoop.mapred.Task$Counter",
+                            "REDUCE_OUTPUT_RECORDS").getValue();
+        } else {
+            System.out.println("Job Failed!!!");
+        }
+
+        return -1;
+    }
+
+    public static class SubjectMapWrMapper extends Mapper<LongWritable, Text, 
Text, MapWritable> {
+        Text outKey = new Text();
+        final String ID = "id";
+        final Text ID_TXT = new Text(ID);
+        final String PERF_AT = "performedBy";
+        final Text PERF_AT_TXT = new Text("system");
+        final String REPORT_AT = "reportedAt";
+        final Text REPORT_AT_TXT = new Text("timestamp");
+        final String TYPE = "type";
+        final Text TYPE_TXT = new Text(TYPE);
+
+        @Override
+        protected void map(LongWritable key, Text value, Context context) 
throws IOException, InterruptedException {
+            String s = value.toString();
+            int i = s.lastIndexOf("\0");
+            Statement stmt = 
RdfIO.readStatement(ByteStreams.newDataInput(s.substring(0, i).getBytes()), 
PartitionConstants.VALUE_FACTORY);
+            String predStr = stmt.getPredicate().stringValue();
+            if (!predStr.contains(PERF_AT) && !predStr.contains(REPORT_AT) && 
!predStr.contains(TYPE))
+                return;
+
+            outKey.set(stmt.getSubject().stringValue());
+            MapWritable mw = new MapWritable();
+            mw.put(ID_TXT, outKey);
+            if (predStr.contains(PERF_AT))
+                mw.put(PERF_AT_TXT, new Text(stmt.getObject().stringValue()));
+            else if (predStr.contains(REPORT_AT))
+                mw.put(REPORT_AT_TXT, new 
Text(stmt.getObject().stringValue()));
+            else if (predStr.contains(TYPE))
+                mw.put(TYPE_TXT, new Text(stmt.getObject().stringValue()));
+
+            context.write(outKey, mw);
+        }
+    }
+
+    public static class OutMapWrReducer extends Reducer<Text, MapWritable, 
Text, Text> {
+        final Text PART = new Text("partitionBS");
+        Text outKey = new Text();
+
+        @Override
+        protected void reduce(Text key, Iterable<MapWritable> values, Context 
context) throws IOException, InterruptedException {
+            MapWritable mw = new MapWritable();
+            for (MapWritable value : values) {
+                mw.putAll(value);
+            }
+            outKey.set(mw.values().toString());
+            context.write(outKey, PART);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatTool.java
----------------------------------------------------------------------
diff --git 
a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatTool.java
 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatTool.java
new file mode 100644
index 0000000..2b4565f
--- /dev/null
+++ 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatTool.java
@@ -0,0 +1,229 @@
+package mvm.mmrts.rdf.partition.mr.compat;
+
+import cloudbase.core.CBConstants;
+import cloudbase.core.client.mapreduce.CloudbaseInputFormat;
+import cloudbase.core.client.mapreduce.CloudbaseOutputFormat;
+import cloudbase.core.data.Key;
+import cloudbase.core.data.Mutation;
+import cloudbase.core.data.Range;
+import cloudbase.core.data.Value;
+import cloudbase.core.security.ColumnVisibility;
+import mvm.mmrts.rdf.partition.PartitionConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ * MMRTS-148 Need to move the shard index from the partition table to the 
shardIndex table
+ * Class MoveShardIndexTool
+ * Date: Dec 8, 2011
+ * Time: 4:11:40 PM
+ */
+public class ChangeShardDateFormatTool implements Tool {
+    public static final String CB_USERNAME_PROP = "cb.username";
+    public static final String CB_PWD_PROP = "cb.pwd";
+    public static final String CB_ZK_PROP = "cb.zk";
+    public static final String CB_INSTANCE_PROP = "cb.instance";
+    public static final String PARTITION_TABLE_PROP = "partition.table";
+    public static final String OLD_DATE_FORMAT_PROP = "date.format.old";
+    public static final String NEW_DATE_FORMAT_PROP = "date.format.new";
+    public static final String OLD_DATE_SHARD_DELIM = "date.shard.delim.old";
+    public static final String NEW_DATE_SHARD_DELIM = "date.shard.delim.new";
+
+
+    private Configuration conf;
+
+    private String userName = "root";
+    private String pwd = "password";
+    private String instance = "stratus";
+    private String zk = "10.40.190.113:2181";
+    private String partitionTable = "rdfPartition";
+    private String oldDateFormat = "yyyy-MM";
+    private String newDateFormat = "yyyyMMdd";
+    private String oldDateDelim = "-";
+    private String newDateDelim = "_";
+
+    public Configuration getConf() {
+        return conf;
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+    }
+
+    public static void main(String[] args) {
+        try {
+            ToolRunner.run(new Configuration(), new 
ChangeShardDateFormatTool(), args);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+        runJob(args);
+        return 0;
+    }
+
+    public long runJob(String[] args) throws Exception {
+        //faster
+        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
+        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
+        conf.set("io.sort.mb", "256");
+
+        zk = conf.get(CB_ZK_PROP, zk);
+        instance = conf.get(CB_INSTANCE_PROP, instance);
+        userName = conf.get(CB_USERNAME_PROP, userName);
+        pwd = conf.get(CB_PWD_PROP, pwd);
+        partitionTable = conf.get(PARTITION_TABLE_PROP, partitionTable);
+        oldDateFormat = conf.get(OLD_DATE_FORMAT_PROP, oldDateFormat);
+        newDateFormat = conf.get(NEW_DATE_FORMAT_PROP, newDateFormat);
+        oldDateDelim = conf.get(OLD_DATE_SHARD_DELIM, oldDateDelim);
+        newDateDelim = conf.get(NEW_DATE_SHARD_DELIM, newDateDelim);
+        conf.set(NEW_DATE_FORMAT_PROP, newDateFormat);
+        conf.set(OLD_DATE_FORMAT_PROP, oldDateFormat);
+        conf.set(PARTITION_TABLE_PROP, partitionTable);
+        conf.set(OLD_DATE_SHARD_DELIM, oldDateDelim);
+        conf.set(NEW_DATE_SHARD_DELIM, newDateDelim);
+
+        Job job = new Job(conf);
+        job.setJarByClass(ChangeShardDateFormatTool.class);
+
+        job.setInputFormatClass(CloudbaseInputFormat.class);
+        //TODO: How should I send in Auths?
+        CloudbaseInputFormat.setInputInfo(job, userName, pwd.getBytes(),
+                partitionTable, CBConstants.NO_AUTHS);
+        CloudbaseInputFormat.setZooKeeperInstance(job, instance, zk);
+
+        job.setMapperClass(ChangeDateFormatMapper.class);
+        job.setMapOutputKeyClass(Text.class);
+        job.setMapOutputValueClass(Mutation.class);
+
+        job.setOutputFormatClass(CloudbaseOutputFormat.class);
+        CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), 
true, partitionTable);
+        CloudbaseOutputFormat.setZooKeeperInstance(job, instance, zk);
+
+        job.setNumReduceTasks(0);
+
+        // Submit the job
+        Date startTime = new Date();
+        System.out.println("Job started: " + startTime);
+        int exitCode = job.waitForCompletion(true) ? 0 : 1;
+
+        if (exitCode == 0) {
+            Date end_time = new Date();
+            System.out.println("Job ended: " + end_time);
+            System.out.println("The job took "
+                    + (end_time.getTime() - startTime.getTime()) / 1000
+                    + " seconds.");
+            return job
+                    .getCounters()
+                    .findCounter("org.apache.hadoop.mapred.Task$Counter",
+                            "REDUCE_OUTPUT_RECORDS").getValue();
+        } else {
+            System.out.println("Job Failed!!!");
+        }
+
+        return -1;
+    }
+
+    public static class ChangeDateFormatMapper extends Mapper<Key, Value, 
Text, Mutation> {
+        private SimpleDateFormat oldDateFormat_df;
+        private SimpleDateFormat newDateFormat_df;
+        private Text partTableTxt;
+        private String newDateDelim;
+        private String oldDateDelim;
+
+        @Override
+        protected void setup(Context context) throws IOException, 
InterruptedException {
+            super.setup(context);
+            String oldDateFormat = 
context.getConfiguration().get(OLD_DATE_FORMAT_PROP);
+            if (oldDateFormat == null)
+                throw new IllegalArgumentException("Old Date Format property 
cannot be null");
+
+            oldDateFormat_df = new SimpleDateFormat(oldDateFormat);
+
+            String newDateFormat = 
context.getConfiguration().get(NEW_DATE_FORMAT_PROP);
+            if (newDateFormat == null)
+                throw new IllegalArgumentException("New Date Format property 
cannot be null");
+
+            newDateFormat_df = new SimpleDateFormat(newDateFormat);
+
+            String partTable = 
context.getConfiguration().get(PARTITION_TABLE_PROP);
+            if (partTable == null)
+                throw new IllegalArgumentException("Partition Table property 
cannot be null");
+
+            partTableTxt = new Text(partTable);
+
+            oldDateDelim = 
context.getConfiguration().get(OLD_DATE_SHARD_DELIM);
+            if (oldDateDelim == null)
+                throw new IllegalArgumentException("Old Date Shard Delimiter 
property cannot be null");
+
+            newDateDelim = 
context.getConfiguration().get(NEW_DATE_SHARD_DELIM);
+            if (newDateDelim == null)
+                throw new IllegalArgumentException("New Date Shard Delimiter 
property cannot be null");
+
+        }
+
+        @Override
+        protected void map(Key key, Value value, Context context) throws 
IOException, InterruptedException {
+            try {
+                String cf = key.getColumnFamily().toString();
+                if ("event".equals(cf) || "index".equals(cf)) {
+                    String shard = key.getRow().toString();
+                    int shardIndex = shard.lastIndexOf(oldDateDelim);
+                    if (shardIndex == -1)
+                        return; //no shard?
+                    String date_s = shard.substring(0, shardIndex);
+                    String shardValue = shard.substring(shardIndex + 1, 
shard.length());
+
+                    Date date = oldDateFormat_df.parse(date_s);
+                    String newShard = newDateFormat_df.format(date) + 
newDateDelim + shardValue;
+
+                    Mutation mutation = new Mutation(new Text(newShard));
+                    mutation.put(key.getColumnFamily(), 
key.getColumnQualifier(),
+                            new ColumnVisibility(key.getColumnVisibility()), 
System.currentTimeMillis(), value);
+                    context.write(partTableTxt, mutation);
+
+                    //delete
+                    mutation = new Mutation(key.getRow());
+                    mutation.putDelete(key.getColumnFamily(), 
key.getColumnQualifier(), System.currentTimeMillis());
+
+                    context.write(partTableTxt, mutation);
+                } else {
+                    //shard index
+                    String shard = key.getColumnFamily().toString();
+                    int shardIndex = shard.lastIndexOf(oldDateDelim);
+                    if (shardIndex == -1)
+                        return; //no shard?
+
+                    String date_s = shard.substring(0, shardIndex);
+                    String shardValue = shard.substring(shardIndex + 1, 
shard.length());
+
+                    Date date = oldDateFormat_df.parse(date_s);
+                    String newShard = newDateFormat_df.format(date) + 
newDateDelim + shardValue;
+                    
+                    Mutation mutation = new Mutation(key.getRow());
+                    mutation.put(new Text(newShard), key.getColumnQualifier(),
+                            new ColumnVisibility(key.getColumnVisibility()), 
System.currentTimeMillis(), value);
+
+                    //delete
+                    mutation.putDelete(key.getColumnFamily(), 
key.getColumnQualifier(), System.currentTimeMillis());
+                    context.write(partTableTxt, mutation);
+                }
+            } catch (ParseException pe) {
+                //only do work for the rows that match the old date format
+                //throw new IOException(pe);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/MoveShardIndexTool.java
----------------------------------------------------------------------
diff --git 
a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/MoveShardIndexTool.java
 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/MoveShardIndexTool.java
new file mode 100644
index 0000000..ba2eece
--- /dev/null
+++ 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/MoveShardIndexTool.java
@@ -0,0 +1,171 @@
+package mvm.mmrts.rdf.partition.mr.compat;
+
+import cloudbase.core.CBConstants;
+import cloudbase.core.client.mapreduce.CloudbaseInputFormat;
+import cloudbase.core.client.mapreduce.CloudbaseOutputFormat;
+import cloudbase.core.data.Key;
+import cloudbase.core.data.Mutation;
+import cloudbase.core.data.Range;
+import cloudbase.core.data.Value;
+import cloudbase.core.security.ColumnVisibility;
+import mvm.mmrts.rdf.partition.PartitionConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Date;
+
+/**
+ * MMRTS-148 Need to move the shard index from the partition table to the 
shardIndex table
+ * Class MoveShardIndexTool
+ * Date: Dec 8, 2011
+ * Time: 4:11:40 PM
+ */
+public class MoveShardIndexTool implements Tool {
+    public static final String CB_USERNAME_PROP = "cb.username";
+    public static final String CB_PWD_PROP = "cb.pwd";
+    public static final String CB_ZK_PROP = "cb.zk";
+    public static final String CB_INSTANCE_PROP = "cb.instance";
+    public static final String PARTITION_TABLE_PROP = "partition.table";
+    public static final String SHARD_INDEX_TABLE_PROP = "shard.index.table";
+    public static final String SHARD_INDEX_DELETE_PROP = "shard.index.delete";
+
+
+    private Configuration conf;
+
+    private String userName = "root";
+    private String pwd = "password";
+    private String instance = "stratus";
+    private String zk = "10.40.190.113:2181";
+    private String partitionTable = "rdfPartition";
+    private String shardIndexTable = "rdfShardIndex";
+
+    public Configuration getConf() {
+        return conf;
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+    }
+
+    public static void main(String[] args) {
+        try {
+            ToolRunner.run(new Configuration(), new MoveShardIndexTool(), 
args);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+        runJob(args);
+        return 0;
+    }
+
+    public long runJob(String[] args) throws Exception {
+        //faster
+        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
+        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
+        conf.set("io.sort.mb", "256");
+
+        zk = conf.get(CB_ZK_PROP, zk);
+        instance = conf.get(CB_INSTANCE_PROP, instance);
+        userName = conf.get(CB_USERNAME_PROP, userName);
+        pwd = conf.get(CB_PWD_PROP, pwd);
+        partitionTable = conf.get(PARTITION_TABLE_PROP, partitionTable);
+        shardIndexTable = conf.get(SHARD_INDEX_TABLE_PROP, shardIndexTable);
+        conf.set(SHARD_INDEX_TABLE_PROP, shardIndexTable);
+        conf.set(PARTITION_TABLE_PROP, partitionTable);
+
+        Job job = new Job(conf);
+        job.setJarByClass(MoveShardIndexTool.class);
+
+        job.setInputFormatClass(CloudbaseInputFormat.class);
+        //TODO: How should I send in Auths?
+        CloudbaseInputFormat.setInputInfo(job, userName, pwd.getBytes(),
+                partitionTable, CBConstants.NO_AUTHS);
+        CloudbaseInputFormat.setZooKeeperInstance(job, instance, zk);
+        CloudbaseInputFormat.setRanges(job, Collections.singleton(
+                new Range(
+                        new Text(PartitionConstants.URI_MARKER_STR),
+                        new 
Text(PartitionConstants.PLAIN_LITERAL_MARKER_STR))));
+
+        job.setMapperClass(ShardKeyValueToMutationMapper.class);
+        job.setMapOutputKeyClass(Text.class);
+        job.setMapOutputValueClass(Mutation.class);
+
+        job.setOutputFormatClass(CloudbaseOutputFormat.class);
+        CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), 
true, shardIndexTable);
+        CloudbaseOutputFormat.setZooKeeperInstance(job, instance, zk);
+
+        job.setNumReduceTasks(0);
+
+        // Submit the job
+        Date startTime = new Date();
+        System.out.println("Job started: " + startTime);
+        int exitCode = job.waitForCompletion(true) ? 0 : 1;
+
+        if (exitCode == 0) {
+            Date end_time = new Date();
+            System.out.println("Job ended: " + end_time);
+            System.out.println("The job took "
+                    + (end_time.getTime() - startTime.getTime()) / 1000
+                    + " seconds.");
+            return job
+                    .getCounters()
+                    .findCounter("org.apache.hadoop.mapred.Task$Counter",
+                            "REDUCE_OUTPUT_RECORDS").getValue();
+        } else {
+            System.out.println("Job Failed!!!");
+        }
+
+        return -1;
+    }
+
+    public static class ShardKeyValueToMutationMapper extends Mapper<Key, 
Value, Text, Mutation> {
+        private Text shardTableTxt;
+        private Text partTableTxt;
+        protected boolean deletePrevShardIndex;
+
+        @Override
+        protected void setup(Context context) throws IOException, 
InterruptedException {
+            super.setup(context);
+            String shardTable = 
context.getConfiguration().get(SHARD_INDEX_TABLE_PROP);
+            if (shardTable == null)
+                throw new IllegalArgumentException("Shard Table property 
cannot be null");
+
+            shardTableTxt = new Text(shardTable);
+
+            String partTable = 
context.getConfiguration().get(PARTITION_TABLE_PROP);
+            if (partTable == null)
+                throw new IllegalArgumentException("Partition Table property 
cannot be null");
+
+            partTableTxt = new Text(partTable);
+
+            deletePrevShardIndex = 
context.getConfiguration().getBoolean(SHARD_INDEX_DELETE_PROP, false);
+            System.out.println("Deleting shard index from previous: " + 
deletePrevShardIndex + " Part: " + partTableTxt);
+        }
+
+        @Override
+        protected void map(Key key, Value value, Context context) throws 
IOException, InterruptedException {
+            Mutation mutation = new Mutation(key.getRow());
+            mutation.put(key.getColumnFamily(), key.getColumnQualifier(),
+                    new ColumnVisibility(key.getColumnVisibility()), 
System.currentTimeMillis(), value);
+
+            context.write(shardTableTxt, mutation);
+
+            if (deletePrevShardIndex) {
+                mutation = new Mutation(key.getRow());
+                mutation.putDelete(key.getColumnFamily(), 
key.getColumnQualifier(), System.currentTimeMillis());
+
+                context.write(partTableTxt, mutation);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputFormat.java
----------------------------------------------------------------------
diff --git 
a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputFormat.java
 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputFormat.java
new file mode 100644
index 0000000..b347a56
--- /dev/null
+++ 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputFormat.java
@@ -0,0 +1,155 @@
+package mvm.mmrts.rdf.partition.mr.fileinput;
+
+import mvm.mmrts.rdf.partition.utils.RdfIO;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.openrdf.model.Statement;
+import org.openrdf.rio.*;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Be able to input multiple rdf formatted files. Convert from rdf format to 
statements.
+ * Class RdfFileInputFormat
+ * Date: May 16, 2011
+ * Time: 2:11:24 PM
+ */
+public class RdfFileInputFormat extends FileInputFormat<LongWritable, 
BytesWritable> {
+
+    public static final String RDF_FILE_FORMAT = 
"mvm.mmrts.rdf.cloudbase.sail.mr.fileinput.rdfformat";
+
+    @Override
+    public RecordReader<LongWritable, BytesWritable> 
createRecordReader(InputSplit inputSplit,
+                                                                            
TaskAttemptContext taskAttemptContext)
+            throws IOException, InterruptedException {
+        return new RdfFileRecordReader();
+    }
+
+    private class RdfFileRecordReader extends RecordReader<LongWritable, 
BytesWritable> implements RDFHandler {
+
+        boolean closed = false;
+        long count = 0;
+        BlockingQueue<BytesWritable> queue = new 
LinkedBlockingQueue<BytesWritable>();
+        int total = 0;
+
+        @Override
+        public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext) throws IOException, InterruptedException {
+            FileSplit fileSplit = (FileSplit) inputSplit;
+            Configuration conf = taskAttemptContext.getConfiguration();
+            String rdfForm_s = conf.get(RDF_FILE_FORMAT, 
RDFFormat.RDFXML.getName());
+            RDFFormat rdfFormat = RDFFormat.valueOf(rdfForm_s);
+
+            Path file = fileSplit.getPath();
+            FileSystem fs = file.getFileSystem(conf);
+            FSDataInputStream fileIn = fs.open(fileSplit.getPath());
+
+            RDFParser rdfParser = Rio.createParser(rdfFormat);
+            rdfParser.setRDFHandler(this);
+            try {
+                rdfParser.parse(fileIn, "");
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
+            fileIn.close();
+            total = queue.size();
+            //TODO: Make this threaded so that you don't hold too many 
statements before sending them
+        }
+
+        @Override
+        public boolean nextKeyValue() throws IOException, InterruptedException 
{
+            return queue.size() > 0;
+        }
+
+        @Override
+        public LongWritable getCurrentKey() throws IOException, 
InterruptedException {
+            return new LongWritable(count++);
+        }
+
+        @Override
+        public BytesWritable getCurrentValue() throws IOException, 
InterruptedException {
+            return queue.poll();
+        }
+
+        @Override
+        public float getProgress() throws IOException, InterruptedException {
+            return ((float) (total - queue.size())) / ((float) total);
+        }
+
+        @Override
+        public void close() throws IOException {
+            closed = true;
+        }
+
+        @Override
+        public void startRDF() throws RDFHandlerException {
+        }
+
+        @Override
+        public void endRDF() throws RDFHandlerException {
+        }
+
+        @Override
+        public void handleNamespace(String s, String s1) throws 
RDFHandlerException {
+        }
+
+        @Override
+        public void handleStatement(Statement statement) throws 
RDFHandlerException {
+            try {
+                byte[] stmt_bytes = RdfIO.writeStatement(statement, true);
+                queue.add(new BytesWritable(stmt_bytes));
+            } catch (IOException e) {
+                throw new RDFHandlerException(e);
+            }
+        }
+
+        @Override
+        public void handleComment(String s) throws RDFHandlerException {
+        }
+    }
+//
+//    public static RDFParser createRdfParser(RDFFormat rdfFormat) {
+//        if (RDFFormat.RDFXML.equals(rdfFormat)) {
+//            return new RDFXMLParserFactory().getParser();
+//        } else if (RDFFormat.N3.equals(rdfFormat)) {
+//            return new N3ParserFactory().getParser();
+//        } else if (RDFFormat.NTRIPLES.equals(rdfFormat)) {
+//            return new NTriplesParserFactory().getParser();
+//        } else if (RDFFormat.TRIG.equals(rdfFormat)) {
+//            return new TriGParserFactory().getParser();
+//        } else if (RDFFormat.TRIX.equals(rdfFormat)) {
+//            return new TriXParserFactory().getParser();
+//        } else if (RDFFormat.TURTLE.equals(rdfFormat)) {
+//            return new TurtleParserFactory().getParser();
+//        }
+//        throw new IllegalArgumentException("Unknown RDFFormat[" + rdfFormat 
+ "]");
+//    }
+//
+//    public static RDFWriter createRdfWriter(RDFFormat rdfFormat, 
OutputStream os) {
+//        if (RDFFormat.RDFXML.equals(rdfFormat)) {
+//            return new RDFXMLWriterFactory().getWriter(os);
+//        } else if (RDFFormat.N3.equals(rdfFormat)) {
+//            return new N3WriterFactory().getWriter(os);
+//        } else if (RDFFormat.NTRIPLES.equals(rdfFormat)) {
+//            return new NTriplesWriterFactory().getWriter(os);
+//        } else if (RDFFormat.TRIG.equals(rdfFormat)) {
+//            return new TriGWriterFactory().getWriter(os);
+//        } else if (RDFFormat.TRIX.equals(rdfFormat)) {
+//            return new TriXWriterFactory().getWriter(os);
+//        } else if (RDFFormat.TURTLE.equals(rdfFormat)) {
+//            return new TurtleWriterFactory().getWriter(os);
+//        }
+//        throw new IllegalArgumentException("Unknown RDFFormat[" + rdfFormat 
+ "]");
+//    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToCloudbaseTool.java
----------------------------------------------------------------------
diff --git 
a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToCloudbaseTool.java
 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToCloudbaseTool.java
new file mode 100644
index 0000000..12c1a4e
--- /dev/null
+++ 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToCloudbaseTool.java
@@ -0,0 +1,210 @@
+package mvm.mmrts.rdf.partition.mr.fileinput;
+
+import cloudbase.core.client.mapreduce.CloudbaseOutputFormat;
+import cloudbase.core.data.Mutation;
+import com.google.common.io.ByteStreams;
+import mvm.mmrts.rdf.partition.shard.DateHashModShardValueGenerator;
+import mvm.mmrts.rdf.partition.utils.RdfIO;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.openrdf.model.Resource;
+import org.openrdf.model.Statement;
+import org.openrdf.model.impl.ValueFactoryImpl;
+
+import java.io.IOException;
+import java.util.Date;
+
+import static mvm.mmrts.rdf.partition.PartitionConstants.*;
+import static mvm.mmrts.rdf.partition.PartitionConstants.EMPTY_VALUE;
+import static mvm.mmrts.rdf.partition.utils.RdfIO.writeStatement;
+import static mvm.mmrts.rdf.partition.utils.RdfIO.writeValue;
+
+/**
+ * Do bulk import of rdf files
+ * Class RdfFileInputToCloudbaseTool
+ * Date: May 16, 2011
+ * Time: 3:12:16 PM
+ */
+public class RdfFileInputToCloudbaseTool implements Tool {
+
+    public static final String CB_USERNAME_PROP = "cb.username";
+    public static final String CB_PWD_PROP = "cb.pwd";
+    public static final String CB_SERVER_PROP = "cb.server";
+    public static final String CB_PORT_PROP = "cb.port";
+    public static final String CB_INSTANCE_PROP = "cb.instance";
+    public static final String CB_TTL_PROP = "cb.ttl";
+    public static final String CB_TABLE_PROP = "cb.table";
+
+
+    private Configuration conf;
+
+    private String userName = "root";
+    private String pwd = "password";
+    private String instance = "stratus";
+    private String server = "10.40.190.113";
+    private String port = "2181";
+    private String table = "partitionRdf";
+
+
+    public Configuration getConf() {
+        return conf;
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+    }
+
+    public static void main(String[] args) {
+        try {
+            ToolRunner.run(new Configuration(), new 
RdfFileInputToCloudbaseTool(), args);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    public long runJob(String[] args) throws IOException, 
ClassNotFoundException, InterruptedException {
+        //faster
+        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
+        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
+        conf.set("io.sort.mb", "256");
+
+        server = conf.get(CB_SERVER_PROP, server);
+        port = conf.get(CB_PORT_PROP, port);
+        instance = conf.get(CB_INSTANCE_PROP, instance);
+        userName = conf.get(CB_USERNAME_PROP, userName);
+        pwd = conf.get(CB_PWD_PROP, pwd);
+        table = conf.get(CB_TABLE_PROP, table);
+        conf.set(CB_TABLE_PROP, table);
+
+        Job job = new Job(conf);
+        job.setJarByClass(RdfFileInputToCloudbaseTool.class);
+
+        // set up cloudbase input
+        job.setInputFormatClass(RdfFileInputFormat.class);
+        RdfFileInputFormat.addInputPath(job, new Path(args[0]));
+
+        // set input output of the particular job
+        job.setMapOutputKeyClass(Text.class);
+        job.setMapOutputValueClass(BytesWritable.class);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(Mutation.class);
+
+        job.setOutputFormatClass(CloudbaseOutputFormat.class);
+        CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), 
true, table);
+        CloudbaseOutputFormat.setZooKeeperInstance(job, instance, server + ":" 
+ port);
+
+        // set mapper and reducer classes
+        job.setMapperClass(OutSubjStmtMapper.class);
+        job.setReducerClass(StatementToMutationReducer.class);
+
+        // set output
+//        Path outputDir = new Path("/temp/sparql-out/testout");
+//        FileSystem dfs = FileSystem.get(outputDir.toUri(), conf);
+//        if (dfs.exists(outputDir))
+//            dfs.delete(outputDir, true);
+//
+//        FileOutputFormat.setOutputPath(job, outputDir);
+
+        // Submit the job
+        Date startTime = new Date();
+        System.out.println("Job started: " + startTime);
+        int exitCode = job.waitForCompletion(true) ? 0 : 1;
+
+        if (exitCode == 0) {
+            Date end_time = new Date();
+            System.out.println("Job ended: " + end_time);
+            System.out.println("The job took "
+                    + (end_time.getTime() - startTime.getTime()) / 1000
+                    + " seconds.");
+            return job
+                    .getCounters()
+                    .findCounter("org.apache.hadoop.mapred.Task$Counter",
+                            "REDUCE_OUTPUT_RECORDS").getValue();
+        } else {
+            System.out.println("Job Failed!!!");
+        }
+
+        return -1;
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+        runJob(args);
+        return 0;
+    }
+
+    public static class OutSubjStmtMapper extends Mapper<LongWritable, 
BytesWritable, Text, BytesWritable> {
+
+        public OutSubjStmtMapper() {
+        }
+
+        @Override
+        protected void setup(Context context) throws IOException, 
InterruptedException {
+            super.setup(context);
+        }
+
+        @Override
+        protected void map(LongWritable key, BytesWritable value, Context 
context) throws IOException, InterruptedException {
+            Statement statement = 
RdfIO.readStatement(ByteStreams.newDataInput(value.getBytes()), 
ValueFactoryImpl.getInstance());
+            context.write(new Text(new 
String(writeValue(statement.getSubject())) + FAMILY_DELIM_STR), value);
+        }
+
+    }
+
+    public static class StatementToMutationReducer extends Reducer<Text, 
BytesWritable, Text, Mutation> {
+        private Text outputTable;
+        private DateHashModShardValueGenerator gen;
+
+        @Override
+        protected void setup(Context context) throws IOException, 
InterruptedException {
+            super.setup(context);
+            outputTable = new 
Text(context.getConfiguration().get(CB_TABLE_PROP, null));
+            gen = new DateHashModShardValueGenerator();
+        }
+
+        @Override
+        protected void reduce(Text key, Iterable<BytesWritable> values, 
Context context) throws IOException, InterruptedException {
+            Resource subject = (Resource) 
RdfIO.readValue(ByteStreams.newDataInput(key.getBytes()), 
ValueFactoryImpl.getInstance(), FAMILY_DELIM);
+            byte[] subj_bytes = writeValue(subject);
+            String shard = gen.generateShardValue(subject);
+            Text shard_txt = new Text(shard);
+
+            /**
+             * Triple - >
+             *- < subject ><shard >:
+             *- < shard > event:<subject >\0 < predicate >\0 < object >\0
+             *- < shard > index:<predicate >\1 < object >\0
+             */
+            Mutation m_subj = new Mutation(shard_txt);
+            for (BytesWritable stmt_bytes : values) {
+                Statement stmt = 
RdfIO.readStatement(ByteStreams.newDataInput(stmt_bytes.getBytes()), 
ValueFactoryImpl.getInstance());
+                m_subj.put(DOC, new Text(writeStatement(stmt, true)), 
EMPTY_VALUE);
+                m_subj.put(INDEX, new Text(writeStatement(stmt, false)), 
EMPTY_VALUE);
+            }
+
+            /**
+             * TODO: Is this right?
+             * If the subject does not have any authorizations specified, then 
anyone can access it.
+             * But the true authorization check will happen at the 
predicate/object level, which means that
+             * the set returned will only be what the person is authorized to 
see.  The shard lookup table has to
+             * have the lowest level authorization all the predicate/object 
authorizations; otherwise,
+             * a user may not be able to see the correct document.
+             */
+            Mutation m_shard = new Mutation(new Text(subj_bytes));
+            m_shard.put(shard_txt, EMPTY_TXT, EMPTY_VALUE);
+
+            context.write(outputTable, m_subj);
+            context.write(outputTable, m_shard);
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToFileTool.java
----------------------------------------------------------------------
diff --git 
a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToFileTool.java
 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToFileTool.java
new file mode 100644
index 0000000..e677d12
--- /dev/null
+++ 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToFileTool.java
@@ -0,0 +1,159 @@
+package mvm.mmrts.rdf.partition.mr.fileinput;
+
+import com.google.common.io.ByteStreams;
+import mvm.mmrts.rdf.partition.utils.RdfIO;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.openrdf.model.Statement;
+import org.openrdf.model.impl.ValueFactoryImpl;
+
+import java.io.IOException;
+import java.util.Date;
+
+import static mvm.mmrts.rdf.partition.PartitionConstants.FAMILY_DELIM_STR;
+import static mvm.mmrts.rdf.partition.utils.RdfIO.writeValue;
+
+/**
+ * Do bulk import of rdf files
+ * Class RdfFileInputToCloudbaseTool
+ * Date: May 16, 2011
+ * Time: 3:12:16 PM
+ */
+public class RdfFileInputToFileTool implements Tool {
+
+    private Configuration conf;
+
+    public Configuration getConf() {
+        return conf;
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+    }
+
+    public static void main(String[] args) {
+        try {
+            ToolRunner.run(new Configuration(), new RdfFileInputToFileTool(), 
args);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    public long runJob(String[] args) throws IOException, 
ClassNotFoundException, InterruptedException {
+        if (args.length < 2)
+            throw new IllegalArgumentException("Usage: RdfFileInputToFileTool 
<input directory> <output directory>");
+
+        //faster
+        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
+        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
+        conf.set("io.sort.mb", "256");
+
+        Job job = new Job(conf);
+        job.setJarByClass(RdfFileInputToFileTool.class);
+
+        // set up cloudbase input
+        job.setInputFormatClass(RdfFileInputFormat.class);
+        RdfFileInputFormat.addInputPath(job, new Path(args[0]));
+
+        // set input output of the particular job
+        job.setMapOutputKeyClass(Text.class);
+        job.setMapOutputValueClass(Text.class);
+        job.setOutputKeyClass(NullWritable.class);
+        job.setOutputValueClass(Text.class);
+
+
+        // set mapper and reducer classes
+        job.setMapperClass(StmtToBytesMapper.class);
+        job.setReducerClass(StmtBytesReducer.class);
+
+        // set output
+        job.setOutputFormatClass(TextOutputFormat.class);
+        Path outputDir = new Path(args[1]);
+        FileSystem dfs = FileSystem.get(outputDir.toUri(), conf);
+        if (dfs.exists(outputDir))
+            dfs.delete(outputDir, true);
+
+        FileOutputFormat.setOutputPath(job, outputDir);
+
+        // Submit the job
+        Date startTime = new Date();
+        System.out.println("Job started: " + startTime);
+        int exitCode = job.waitForCompletion(true) ? 0 : 1;
+
+        if (exitCode == 0) {
+            Date end_time = new Date();
+            System.out.println("Job ended: " + end_time);
+            System.out.println("The job took "
+                    + (end_time.getTime() - startTime.getTime()) / 1000
+                    + " seconds.");
+            return job
+                    .getCounters()
+                    .findCounter("org.apache.hadoop.mapred.Task$Counter",
+                            "REDUCE_OUTPUT_RECORDS").getValue();
+        } else {
+            System.out.println("Job Failed!!!");
+        }
+
+        return -1;
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+        runJob(args);
+        return 0;
+    }
+
+    public static class StmtToBytesMapper extends Mapper<LongWritable, 
BytesWritable, Text, Text> {
+
+        Text outKey = new Text();
+        Text outValue = new Text();
+
+        public StmtToBytesMapper() {
+        }
+
+        @Override
+        protected void setup(Context context) throws IOException, 
InterruptedException {
+            super.setup(context);
+        }
+
+        @Override
+        protected void map(LongWritable key, BytesWritable value, Context 
context) throws IOException, InterruptedException {
+            Statement statement = 
RdfIO.readStatement(ByteStreams.newDataInput(value.getBytes()), 
ValueFactoryImpl.getInstance());
+            outKey.set(new String(writeValue(statement.getSubject())) + 
FAMILY_DELIM_STR);
+            outValue.set(value.getBytes());
+            context.write(outKey, outValue);
+        }
+
+    }
+
+    public static class StmtBytesReducer extends Reducer<Text, Text, 
NullWritable, Text> {
+
+        NullWritable outKey = NullWritable.get();
+
+        @Override
+        protected void setup(Context context) throws IOException, 
InterruptedException {
+            super.setup(context);
+        }
+
+        @Override
+        protected void reduce(Text key, Iterable<Text> values, Context 
context) throws IOException, InterruptedException {
+            for (Text stmt_txt : values) {
+                context.write(outKey, stmt_txt);
+            }
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/BulkNtripsInputTool.java
----------------------------------------------------------------------
diff --git 
a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/BulkNtripsInputTool.java
 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/BulkNtripsInputTool.java
new file mode 100644
index 0000000..fea882d
--- /dev/null
+++ 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/BulkNtripsInputTool.java
@@ -0,0 +1,326 @@
+package mvm.mmrts.rdf.partition.mr.fileinput.bulk;
+
+import cloudbase.core.client.Connector;
+import cloudbase.core.client.Instance;
+import cloudbase.core.client.ZooKeeperInstance;
+import cloudbase.core.client.mapreduce.CloudbaseFileOutputFormat;
+import cloudbase.core.client.mapreduce.lib.partition.RangePartitioner;
+import cloudbase.core.data.Key;
+import cloudbase.core.data.Value;
+import cloudbase.core.util.TextUtil;
+import com.google.common.base.Preconditions;
+import mvm.rya.cloudbase.utils.bulk.KeyRangePartitioner;
+import mvm.mmrts.rdf.partition.shard.DateHashModShardValueGenerator;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.openrdf.model.Statement;
+import org.openrdf.rio.RDFHandler;
+import org.openrdf.rio.RDFHandlerException;
+import org.openrdf.rio.RDFParser;
+import org.openrdf.rio.ntriples.NTriplesParserFactory;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.StringReader;
+import java.util.Collection;
+
+import static mvm.mmrts.rdf.partition.PartitionConstants.*;
+import static mvm.mmrts.rdf.partition.utils.RdfIO.writeStatement;
+import static mvm.mmrts.rdf.partition.utils.RdfIO.writeValue;
+
+/**
+ * Take large ntrips files and use MapReduce and Cloudbase
+ * Bulk ingest techniques to load into the table in our partition format.
+ * <p/>
+ * Input: NTrips file
+ * Map:
+ * - key : shard row - Text
+ * - value : stmt in doc triple format - Text
+ * Partitioner: RangePartitioner
+ * Reduce:
+ * - key : all the entries for each triple - Cloudbase Key
+ * Class BulkNtripsInputTool
+ * Date: Sep 13, 2011
+ * Time: 10:00:17 AM
+ */
+public class BulkNtripsInputTool extends Configured implements Tool {
+
+    private static DateHashModShardValueGenerator generator = new 
DateHashModShardValueGenerator();
+    public static final String BASE_MOD = "baseMod";
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Preconditions.checkArgument(args.length >= 7, "Usage: hadoop jar 
jarfile BulkNtripsInputTool <cb instance>" +
+                " <zookeepers> <username> <password> <output table> <hdfs 
ntrips dir> <work dir> (<shard size>)");
+
+        Configuration conf = getConf();
+        PrintStream out = null;
+        try {
+            Job job = new Job(conf, "Bulk Ingest NTrips to Partition RDF");
+            job.setJarByClass(this.getClass());
+
+            //setting long job
+            
job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", 
false);
+            
job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", 
false);
+            job.getConfiguration().set("io.sort.mb", "256");
+
+            job.setInputFormatClass(TextInputFormat.class);
+
+            job.setMapperClass(ParseNtripsMapper.class);
+            job.setMapOutputKeyClass(Key.class);
+            job.setMapOutputValueClass(Value.class);
+
+            job.setCombinerClass(OutStmtMutationsReducer.class);
+            job.setReducerClass(OutStmtMutationsReducer.class);
+            job.setOutputFormatClass(CloudbaseFileOutputFormat.class);
+            CloudbaseFileOutputFormat.setZooKeeperInstance(job, args[0], 
args[1]);
+
+            Instance instance = new ZooKeeperInstance(args[0], args[1]);
+            String user = args[2];
+            byte[] pass = args[3].getBytes();
+            String tableName = args[4];
+            String inputDir = args[5];
+            String workDir = args[6];
+            if(args.length > 7) {
+                int baseMod = Integer.parseInt(args[7]);
+                generator.setBaseMod(baseMod);
+                job.getConfiguration().setInt(BASE_MOD, baseMod);
+            }
+
+            Connector connector = instance.getConnector(user, pass);
+
+            TextInputFormat.setInputPaths(job, new Path(inputDir));
+
+            FileSystem fs = FileSystem.get(conf);
+            Path workPath = new Path(workDir);
+            if (fs.exists(workPath))
+                fs.delete(workPath, true);
+
+            CloudbaseFileOutputFormat.setOutputPath(job, new Path(workDir + 
"/files"));
+
+            out = new PrintStream(new BufferedOutputStream(fs.create(new 
Path(workDir + "/splits.txt"))));
+
+            Collection<Text> splits = 
connector.tableOperations().getSplits(tableName, Integer.MAX_VALUE);
+            for (Text split : splits)
+                out.println(new 
String(Base64.encodeBase64(TextUtil.getBytes(split))));
+
+            job.setNumReduceTasks(splits.size() + 1);
+            out.close();
+
+            job.setPartitionerClass(KeyRangePartitioner.class);
+            RangePartitioner.setSplitFile(job, workDir + "/splits.txt");
+
+            job.waitForCompletion(true);
+
+            connector.tableOperations().importDirectory(
+                    tableName,
+                    workDir + "/files",
+                    workDir + "/failures",
+                    20,
+                    4,
+                    false);
+
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        } finally {
+            if (out != null)
+                out.close();
+        }
+
+        return 0;
+    }
+
+    public static void main(String[] args) {
+        try {
+            ToolRunner.run(new Configuration(), new BulkNtripsInputTool(), 
args);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * input: ntrips format triple
+     * <p/>
+     * output: key: shard row from generator
+     * value: stmt in serialized format (document format)
+     */
+    public static class ParseNtripsMapper extends Mapper<LongWritable, Text, 
Key, Value> {
+        private static final NTriplesParserFactory N_TRIPLES_PARSER_FACTORY = 
new NTriplesParserFactory();
+
+        private Text outputKey = new Text();
+        private Text outputValue = new Text();
+        private RDFParser parser;
+        private static byte[] EMPTY_BYTE_ARRAY = new byte[0];
+
+        @Override
+        protected void setup(final Context context) throws IOException, 
InterruptedException {
+            super.setup(context);
+            Configuration conf = context.getConfiguration();
+            generator.setBaseMod(conf.getInt(BASE_MOD, 
generator.getBaseMod()));
+            parser = N_TRIPLES_PARSER_FACTORY.getParser();
+            parser.setRDFHandler(new RDFHandler() {
+
+                @Override
+                public void startRDF() throws RDFHandlerException {
+
+                }
+
+                @Override
+                public void endRDF() throws RDFHandlerException {
+
+                }
+
+                @Override
+                public void handleNamespace(String s, String s1) throws 
RDFHandlerException {
+
+                }
+
+                @Override
+                public void handleStatement(Statement statement) throws 
RDFHandlerException {
+                    try {
+//                        byte[] doc_serialized = writeStatement(statement, 
true);
+                        Text shard = new 
Text(generator.generateShardValue(statement.getSubject()));
+
+                        context.write(new Key(shard, DOC, new 
Text(writeStatement(statement, true))), EMPTY_VALUE);
+                        context.write(new Key(shard, INDEX, new 
Text(writeStatement(statement, false))), EMPTY_VALUE);
+                        //TODO: Wish we didn't have to do this constantly, 
probably better to just aggregate all subjects and do it once
+                        context.write(new Key(new 
Text(writeValue(statement.getSubject())), shard, EMPTY_TXT), EMPTY_VALUE);
+
+//                        outputKey.set(key);
+//                        outputValue.set(doc_serialized);
+//                        context.write(outputKey, outputValue);
+//                        outputKey.set(writeValue(statement.getSubject()));
+//                        outputValue.set(EMPTY_BYTE_ARRAY);
+//                        context.write(outputKey, outputValue);
+                    } catch (Exception e) {
+                        throw new RDFHandlerException(e);
+                    }
+                }
+
+                @Override
+                public void handleComment(String s) throws RDFHandlerException 
{
+
+                }
+            });
+        }
+
+        @Override
+        public void map(LongWritable key, Text value, Context output)
+                throws IOException, InterruptedException {
+            try {
+                parser.parse(new StringReader(value.toString()), "");
+            } catch (Exception e) {
+                throw new IOException("Exception occurred parsing ntrips 
triple[" + value + "]");
+            }
+        }
+    }
+
+    public static class OutStmtMutationsReducer extends Reducer<Key, Value, 
Key, Value> {
+
+        public void reduce(Key key, Iterable<Value> values, Context output)
+                throws IOException, InterruptedException {
+            output.write(key, EMPTY_VALUE);
+//            System.out.println(key);
+//            for (Value value : values) {
+//                System.out.println(value);
+            /**
+             * Each of these is a triple.
+             * 1. format back to statement
+             * 2. Output the doc,index key,value pairs for each triple
+             */
+//                Statement stmt = 
readStatement(ByteStreams.newDataInput(value.getBytes()), VALUE_FACTORY);
+//                output.write(new Key(shardKey, DOC, new 
Text(writeStatement(stmt, true))), EMPTY_VALUE);
+//                output.write(new Key(shardKey, INDEX, new 
Text(writeStatement(stmt, false))), EMPTY_VALUE);
+//                //TODO: Wish we didn't have to do this constantly, probably 
better to just aggregate all subjects and do it once
+//                output.write(new Key(new 
Text(writeValue(stmt.getSubject())), shardKey, EMPTY_TXT), EMPTY_VALUE);
+//            }
+        }
+    }
+
+    public static class EmbedKeyGroupingComparator implements 
RawComparator<Text> {
+
+        public EmbedKeyGroupingComparator() {
+
+        }
+
+        @Override
+        public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int 
arg4,
+                           int arg5) {
+            DataInputBuffer n = new DataInputBuffer();
+
+            Text temp1 = new Text();
+            Text temp2 = new Text();
+
+            try {
+                n.reset(arg0, arg1, arg2);
+                temp1.readFields(n);
+                n.reset(arg3, arg4, arg5);
+                temp2.readFields(n);
+            } catch (IOException e) {
+                // TODO Auto-generated catch block
+                //e.printStackTrace();
+                throw new RuntimeException(e);
+            }
+
+            return compare(temp1, temp2);
+        }
+
+        @Override
+        public int compare(Text a1, Text a2) {
+            return 
EmbedKeyRangePartitioner.retrieveEmbedKey(a1).compareTo(EmbedKeyRangePartitioner.retrieveEmbedKey(a2));
+        }
+
+    }
+
+    /**
+     * Really it does a normal Text compare
+     */
+    public static class EmbedKeySortComparator implements RawComparator<Text> {
+
+        public EmbedKeySortComparator() {
+
+        }
+
+        @Override
+        public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int 
arg4,
+                           int arg5) {
+            DataInputBuffer n = new DataInputBuffer();
+
+            Text temp1 = new Text();
+            Text temp2 = new Text();
+
+            try {
+                n.reset(arg0, arg1, arg2);
+                temp1.readFields(n);
+                n.reset(arg3, arg4, arg5);
+                temp2.readFields(n);
+            } catch (IOException e) {
+                // TODO Auto-generated catch block
+                //e.printStackTrace();
+                throw new RuntimeException(e);
+            }
+
+            return compare(temp1, temp2);
+        }
+
+        @Override
+        public int compare(Text a1, Text a2) {
+            return a1.compareTo(a2);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitioner.java
----------------------------------------------------------------------
diff --git 
a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitioner.java
 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitioner.java
new file mode 100644
index 0000000..f72c382
--- /dev/null
+++ 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitioner.java
@@ -0,0 +1,28 @@
+package mvm.mmrts.rdf.partition.mr.fileinput.bulk;
+
+import cloudbase.core.client.mapreduce.lib.partition.RangePartitioner;
+import mvm.mmrts.rdf.partition.PartitionConstants;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Class EmbedKeyRangePartitioner
+ * Date: Sep 13, 2011
+ * Time: 1:49:35 PM
+ */
+public class EmbedKeyRangePartitioner extends RangePartitioner {
+    @Override
+    public int getPartition(Text key, Writable value, int numPartitions) {
+        Text embedKey = retrieveEmbedKey(key);
+        return super.getPartition(embedKey, value, numPartitions);
+    }
+
+    public static Text retrieveEmbedKey(Text key) {
+        int split = key.find(PartitionConstants.INDEX_DELIM_STR);
+        if (split < 0)
+            return key;
+        Text newText = new Text();
+        newText.append(key.getBytes(), 0, split);
+        return newText;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/iterators/SortedEncodedRangeIterator.java
----------------------------------------------------------------------
diff --git 
a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/iterators/SortedEncodedRangeIterator.java
 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/iterators/SortedEncodedRangeIterator.java
new file mode 100644
index 0000000..a83d594
--- /dev/null
+++ 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/iterators/SortedEncodedRangeIterator.java
@@ -0,0 +1,45 @@
+package mvm.mmrts.rdf.partition.mr.iterators;
+
+import cloudbase.core.data.Key;
+import cloudbase.core.data.Value;
+import cloudbase.core.iterators.IteratorEnvironment;
+import cloudbase.core.iterators.SortedKeyValueIterator;
+import cloudbase.core.util.TextUtil;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.io.Text;
+import ss.cloudbase.core.iterators.SortedRangeIterator;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Class SortedEncodedRangeIterator
+ * Date: Sep 8, 2011
+ * Time: 6:01:28 PM
+ */
+public class SortedEncodedRangeIterator extends SortedRangeIterator {
+
+    @Override
+    public void init(SortedKeyValueIterator<Key, Value> source, Map<String, 
String> options, IteratorEnvironment env) throws IOException {
+        super.init(source, options, env);
+        if (options.containsKey(OPTION_LOWER_BOUND)) {
+            lower = new Text(decode(options.get(OPTION_LOWER_BOUND)));
+        } else {
+            lower = new Text("\u0000");
+        }
+
+        if (options.containsKey(OPTION_UPPER_BOUND)) {
+            upper = new Text(decode(options.get(OPTION_UPPER_BOUND)));
+        } else {
+            upper = new Text("\u0000");
+        }
+    }
+
+    public static String encode(String txt) {
+        return new String(Base64.encodeBase64(txt.getBytes()));
+    }
+
+    public static String decode(String txt) {
+        return new String(Base64.decodeBase64(txt.getBytes()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectCombiner.java
----------------------------------------------------------------------
diff --git 
a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectCombiner.java
 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectCombiner.java
new file mode 100644
index 0000000..e360ca7
--- /dev/null
+++ 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectCombiner.java
@@ -0,0 +1,31 @@
+package mvm.mmrts.rdf.partition.mr.transform;
+
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.Reducer;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Since each subject is located at most on one tablet, we should be able to 
assume that
+ * no reducer is needed.  The Combine phase should aggregate properly.
+ * <p/>
+ * Class AggregateTriplesBySubjectReducer
+ * Date: Sep 1, 2011
+ * Time: 5:39:24 PM
+ */
+public class AggregateTriplesBySubjectCombiner extends Reducer<Text, 
MapWritable, Text, MapWritable> {
+//    private LongWritable lwout = new LongWritable();
+    private MapWritable mwout = new MapWritable();
+
+    @Override
+    protected void reduce(Text key, Iterable<MapWritable> values, Context 
context) throws IOException, InterruptedException {
+        for (MapWritable value : values) {
+            for (Map.Entry<Writable, Writable> entry : value.entrySet()) {
+                mwout.put(WritableUtils.clone(entry.getKey(), 
context.getConfiguration()),
+                        WritableUtils.clone(entry.getValue(), 
context.getConfiguration()));
+            }
+        }
+        context.write(key, mwout);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectReducer.java
----------------------------------------------------------------------
diff --git 
a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectReducer.java
 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectReducer.java
new file mode 100644
index 0000000..2ea5fa8
--- /dev/null
+++ 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectReducer.java
@@ -0,0 +1,37 @@
+package mvm.mmrts.rdf.partition.mr.transform;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.Reducer;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
mvm.mmrts.rdf.partition.mr.transform.SparqlCloudbaseIFTransformerConstants.SELECT_FILTER;
+
+/**
+ * Since each subject is located at most on one tablet, we should be able to 
assume that
+ * no reducer is needed.  The Combine phase should aggregate properly.
+ * <p/>
+ * Class AggregateTriplesBySubjectReducer
+ * Date: Sep 1, 2011
+ * Time: 5:39:24 PM
+ */
+public class AggregateTriplesBySubjectReducer extends Reducer<Text, 
MapWritable, LongWritable, MapWritable> {
+    private LongWritable lwout = new LongWritable();
+    private MapWritable mwout = new MapWritable();
+
+    @Override
+    protected void reduce(Text key, Iterable<MapWritable> values, Context 
context) throws IOException, InterruptedException {
+        for (MapWritable value : values) {
+            for (Map.Entry<Writable, Writable> entry : value.entrySet()) {
+                mwout.put(WritableUtils.clone(entry.getKey(), 
context.getConfiguration()),
+                        WritableUtils.clone(entry.getValue(), 
context.getConfiguration()));
+            }
+        }
+        lwout.set(key.hashCode());
+        context.write(lwout, mwout);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/KeyValueToMapWrMapper.java
----------------------------------------------------------------------
diff --git 
a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/KeyValueToMapWrMapper.java
 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/KeyValueToMapWrMapper.java
new file mode 100644
index 0000000..0630501
--- /dev/null
+++ 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/KeyValueToMapWrMapper.java
@@ -0,0 +1,78 @@
+package mvm.mmrts.rdf.partition.mr.transform;
+
+import cloudbase.core.data.Key;
+import cloudbase.core.data.Value;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.io.ByteStreams;
+import mvm.mmrts.rdf.partition.PartitionConstants;
+import mvm.mmrts.rdf.partition.utils.RdfIO;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.openrdf.model.Statement;
+
+import java.io.IOException;
+import java.util.*;
+
+import static 
mvm.mmrts.rdf.partition.mr.transform.SparqlCloudbaseIFTransformerConstants.*;
+
+/**
+ * Will take a triple and output: <subject, predObj map>
+ * <p/>
+ * Class KeyValueToMapWrMapper
+ * Date: Sep 1, 2011
+ * Time: 4:56:42 PM
+ */
+public class KeyValueToMapWrMapper extends Mapper<Key, Value, Text, 
MapWritable> {
+
+//    private List<String> predicateFilter = new ArrayList<String>();
+
+    private Text subjNameTxt;
+    private Text keyout = new Text();
+    private Text predout = new Text();
+    private Text objout = new Text();
+
+    private Map<String, String> predValueName = new HashMap();
+
+    @Override
+    protected void setup(Context context) throws IOException, 
InterruptedException {
+        super.setup(context);
+        //find the values to filter on
+        Configuration conf = context.getConfiguration();
+        String[] filter = conf.getStrings(SELECT_FILTER);
+        if (filter != null) {
+            for (String predValue : filter) {
+                String predName = conf.get(predValue);
+                if (predName != null)
+                    predValueName.put(predValue, predName);
+            }
+        }
+
+        String subjName = conf.get(SUBJECT_NAME);
+        if (subjName != null) {
+            //not sure it will ever be null
+            subjNameTxt = new Text(subjName);
+        }
+    }
+
+    @Override
+    protected void map(Key key, Value value, Context context) throws 
IOException, InterruptedException {
+        Statement stmt = 
RdfIO.readStatement(ByteStreams.newDataInput(key.getColumnQualifier().getBytes()),
 PartitionConstants.VALUE_FACTORY);
+        String predName = predValueName.get(stmt.getPredicate().stringValue());
+        if (predName == null)
+            return;
+
+        keyout.set(stmt.getSubject().stringValue());
+        predout.set(predName);
+        objout.set(stmt.getObject().stringValue());
+        MapWritable mw = new MapWritable();
+        mw.put(predout, objout);
+        if (subjNameTxt != null) {
+            mw.put(subjNameTxt, keyout);
+        }
+        context.write(keyout, mw);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFJob.java
----------------------------------------------------------------------
diff --git 
a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFJob.java
 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFJob.java
new file mode 100644
index 0000000..56014f9
--- /dev/null
+++ 
b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFJob.java
@@ -0,0 +1,118 @@
+package mvm.mmrts.rdf.partition.mr.transform;
+
+import cloudbase.core.util.ArgumentChecker;
+import mvm.mmrts.rdf.partition.query.evaluation.FilterTimeIndexVisitor;
+import mvm.mmrts.rdf.partition.query.evaluation.SubjectGroupingOptimizer;
+import mvm.mmrts.rdf.partition.query.operators.ShardSubjectLookup;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.QueryModelNode;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+import org.openrdf.query.parser.QueryParser;
+import org.openrdf.query.parser.sparql.SPARQLParserFactory;
+
+/**
+ * Class SparqlCloudbaseIFJob
+ * Date: Sep 1, 2011
+ * Time: 6:04:35 PM
+ */
+public class SparqlCloudbaseIFJob {
+
+    private String[] queries;
+    private String table;
+
+    //Cloudbase properties
+    private String userName;
+    private String pwd;
+    private String instance;
+    private String zk;
+    //
+
+    private Class classOriginal; //Calling class for this job.
+    private String outputPath;
+
+    public SparqlCloudbaseIFJob(String table, String userName, String pwd, 
String instance, String zk,
+                                String outputPath, Class classOriginal, 
String... queries) {
+        ArgumentChecker.notNull(queries);
+        this.queries = queries;
+        this.table = table;
+        this.userName = userName;
+        this.pwd = pwd;
+        this.instance = instance;
+        this.zk = zk;
+        this.outputPath = outputPath;
+        this.classOriginal = classOriginal;
+    }
+
+    public String[] run() throws Exception {
+        int count = 0;
+        outputPath = outputPath + "/results/";
+        String[] resultsOut = new String[queries.length];
+
+        for (String query : queries) {
+            QueryParser parser = (new SPARQLParserFactory()).getParser();
+            TupleExpr expr = parser.parseQuery(query, 
"http://www.w3.org/1999/02/22-rdf-syntax-ns#";).getTupleExpr();
+
+            final Configuration queryConf = new Configuration();
+            expr.visit(new FilterTimeIndexVisitor(queryConf));
+
+            (new SubjectGroupingOptimizer(queryConf)).optimize(expr, null, 
null);
+
+            //make sure of only one shardlookup
+            expr.visit(new QueryModelVisitorBase<RuntimeException>() {
+                int count = 0;
+
+                @Override
+                public void meetOther(QueryModelNode node) throws 
RuntimeException {
+                    super.meetOther(node);
+                    count++;
+                    if (count > 1)
+                        throw new IllegalArgumentException("Query can only 
have one subject-star lookup");
+                }
+            });
+
+            final Job job = new Job(queryConf);
+            job.setJarByClass(classOriginal);
+            job.setJobName("SparqlCloudbaseIFTransformer. Query: " + 
((query.length() > 32) ? (query.substring(0, 32)) : (query)));
+
+            expr.visit(new QueryModelVisitorBase<RuntimeException>() {
+                @Override
+                public void meetOther(QueryModelNode node) throws 
RuntimeException {
+                    super.meetOther(node);
+
+                    //set up CloudbaseBatchScannerInputFormat here
+                    if (node instanceof ShardSubjectLookup) {
+                        System.out.println("Lookup: " + node);
+                        try {
+                            new 
SparqlCloudbaseIFTransformer((ShardSubjectLookup) node, queryConf, job, table,
+                                    userName, pwd, instance, zk);
+                        } catch (QueryEvaluationException e) {
+                            e.printStackTrace();
+                        }
+                    }
+                }
+            });
+
+
+            String resultOutPath = outputPath + "/result-" + count;
+            resultsOut[count] = resultOutPath;
+            Path outputDir = new Path(resultOutPath);
+            FileSystem dfs = FileSystem.get(outputDir.toUri(), queryConf);
+            if (dfs.exists(outputDir))
+                dfs.delete(outputDir, true);
+            job.setOutputFormatClass(SequenceFileOutputFormat.class);
+            SequenceFileOutputFormat.setOutputPath(job, outputDir);
+
+
+            // Submit the job
+            job.waitForCompletion(true);
+            count++;
+        }
+        return resultsOut;
+    }
+}

Reply via email to