http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/BulkNtripsInputTool.java
----------------------------------------------------------------------
diff --git 
a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/BulkNtripsInputTool.java
 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/BulkNtripsInputTool.java
new file mode 100644
index 0000000..c03b124
--- /dev/null
+++ 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/BulkNtripsInputTool.java
@@ -0,0 +1,318 @@
+package mvm.rya.cloudbase.mr.fileinput;
+
+import cloudbase.core.client.Connector;
+import cloudbase.core.client.ZooKeeperInstance;
+import cloudbase.core.client.admin.TableOperations;
+import cloudbase.core.client.mapreduce.CloudbaseFileOutputFormat;
+import cloudbase.core.client.mapreduce.lib.partition.RangePartitioner;
+import cloudbase.core.data.Key;
+import cloudbase.core.data.Value;
+import cloudbase.core.util.TextUtil;
+import com.google.common.base.Preconditions;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.resolver.RdfToRyaConversions;
+import mvm.rya.api.resolver.RyaContext;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolver;
+import mvm.rya.cloudbase.CloudbaseRdfConstants;
+import mvm.rya.cloudbase.mr.utils.MRUtils;
+import mvm.rya.cloudbase.utils.bulk.KeyRangePartitioner;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.openrdf.model.Statement;
+import org.openrdf.rio.*;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static mvm.rya.cloudbase.CloudbaseRdfUtils.extractValue;
+import static mvm.rya.cloudbase.CloudbaseRdfUtils.from;
+
+/**
+ * Take large ntrips files and use MapReduce and Cloudbase
+ * Bulk ingest techniques to load into the table in our partition format.
+ * <p/>
+ * Input: NTrips file
+ * Map:
+ * - key : shard row - Text
+ * - value : stmt in doc triple format - Text
+ * Partitioner: RangePartitioner
+ * Reduce:
+ * - key : all the entries for each triple - Cloudbase Key
+ * Class BulkNtripsInputTool
+ * Date: Sep 13, 2011
+ * Time: 10:00:17 AM
+ */
+public class BulkNtripsInputTool extends Configured implements Tool {
+
+    public static final String WORKDIR_PROP = "bulk.n3.workdir";
+
+    private String userName = "root";
+    private String pwd = "password";
+    private String instance = "stratus";
+    private String zk = "10.40.190.129:2181";
+    private String ttl = null;
+    private String workDirBase = "/temp/bulkcb/work";
+    private String format = RDFFormat.NTRIPLES.getName();
+
+    @Override
+    public int run(final String[] args) throws Exception {
+        final Configuration conf = getConf();
+        try {
+            //conf
+            zk = conf.get(MRUtils.CB_ZK_PROP, zk);
+            ttl = conf.get(MRUtils.CB_TTL_PROP, ttl);
+            instance = conf.get(MRUtils.CB_INSTANCE_PROP, instance);
+            userName = conf.get(MRUtils.CB_USERNAME_PROP, userName);
+            pwd = conf.get(MRUtils.CB_PWD_PROP, pwd);
+            workDirBase = conf.get(WORKDIR_PROP, workDirBase);
+            format = conf.get(MRUtils.FORMAT_PROP, format);
+            conf.set(MRUtils.FORMAT_PROP, format);
+            final String inputDir = args[0];
+
+            ZooKeeperInstance zooKeeperInstance = new 
ZooKeeperInstance(instance, zk);
+            Connector connector = zooKeeperInstance.getConnector(userName, 
pwd);
+            TableOperations tableOperations = connector.tableOperations();
+
+            String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null);
+            if (tablePrefix != null)
+                RdfCloudTripleStoreConstants.prefixTables(tablePrefix);
+            String[] tables = {tablePrefix + 
RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX,
+                    tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX,
+                    tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX};
+            Collection<Job> jobs = new ArrayList<Job>();
+            for (final String tableName : tables) {
+                PrintStream out = null;
+                try {
+                    String workDir = workDirBase + "/" + tableName;
+                    System.out.println("Loading data into table[" + tableName 
+ "]");
+
+                    Job job = new Job(new Configuration(conf), "Bulk Ingest 
load data to Generic RDF Table[" + tableName + "]");
+                    job.setJarByClass(this.getClass());
+                    //setting long job
+                    Configuration jobConf = job.getConfiguration();
+                    
jobConf.setBoolean("mapred.map.tasks.speculative.execution", false);
+                    
jobConf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
+                    jobConf.set("io.sort.mb", jobConf.get("io.sort.mb", 
"256"));
+                    jobConf.setBoolean("mapred.compress.map.output", true);
+//                    jobConf.set("mapred.map.output.compression.codec", 
"org.apache.hadoop.io.compress.GzipCodec"); //TODO: I would like LZO compression
+
+                    job.setInputFormatClass(TextInputFormat.class);
+
+                    job.setMapperClass(ParseNtripsMapper.class);
+                    job.setMapOutputKeyClass(Key.class);
+                    job.setMapOutputValueClass(Value.class);
+
+                    job.setCombinerClass(OutStmtMutationsReducer.class);
+                    job.setReducerClass(OutStmtMutationsReducer.class);
+                    job.setOutputFormatClass(CloudbaseFileOutputFormat.class);
+                    CloudbaseFileOutputFormat.setZooKeeperInstance(job, 
instance, zk);
+
+                    jobConf.set(ParseNtripsMapper.TABLE_PROPERTY, tableName);
+
+                    TextInputFormat.setInputPaths(job, new Path(inputDir));
+
+                    FileSystem fs = FileSystem.get(conf);
+                    Path workPath = new Path(workDir);
+                    if (fs.exists(workPath))
+                        fs.delete(workPath, true);
+
+                    CloudbaseFileOutputFormat.setOutputPath(job, new 
Path(workDir + "/files"));
+
+                    out = new PrintStream(new 
BufferedOutputStream(fs.create(new Path(workDir + "/splits.txt"))));
+
+                    if (!tableOperations.exists(tableName))
+                        tableOperations.create(tableName);
+                    Collection<Text> splits = 
tableOperations.getSplits(tableName, Integer.MAX_VALUE);
+                    for (Text split : splits)
+                        out.println(new 
String(Base64.encodeBase64(TextUtil.getBytes(split))));
+
+                    job.setNumReduceTasks(splits.size() + 1);
+                    out.close();
+
+                    job.setPartitionerClass(KeyRangePartitioner.class);
+                    RangePartitioner.setSplitFile(job, workDir + 
"/splits.txt");
+
+                    jobConf.set(WORKDIR_PROP, workDir);
+
+                    job.submit();
+                    jobs.add(job);
+
+                } catch (Exception re) {
+                    throw new RuntimeException(re);
+                } finally {
+                    if (out != null)
+                        out.close();
+                }
+            }
+
+            for (Job job : jobs) {
+                while (!job.isComplete()) {
+                    Thread.sleep(1000);
+                }
+            }
+
+            for (String tableName : tables) {
+                String workDir = workDirBase + "/" + tableName;
+                tableOperations.importDirectory(
+                        tableName,
+                        workDir + "/files",
+                        workDir + "/failures",
+                        20,
+                        4,
+                        false);
+            }
+
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        return 0;
+    }
+
+    public static void main(String[] args) {
+        try {
+            ToolRunner.run(new Configuration(), new BulkNtripsInputTool(), 
args);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * input: ntrips format triple
+     * <p/>
+     * output: key: shard row from generator
+     * value: stmt in serialized format (document format)
+     */
+    public static class ParseNtripsMapper extends Mapper<LongWritable, Text, 
Key, Value> {
+        public static final String TABLE_PROPERTY = "parsentripsmapper.table";
+
+        private RDFParser parser;
+        private String rdfFormat;
+        private String namedGraph;
+        private RyaContext ryaContext = RyaContext.getInstance();
+        private TripleRowResolver rowResolver = ryaContext.getTripleResolver();
+
+        @Override
+        protected void setup(final Context context) throws IOException, 
InterruptedException {
+            super.setup(context);
+            Configuration conf = context.getConfiguration();
+            final String table = conf.get(TABLE_PROPERTY);
+            Preconditions.checkNotNull(table, "Set the " + TABLE_PROPERTY + " 
property on the map reduce job");
+
+            final String cv_s = conf.get(MRUtils.CB_CV_PROP);
+            final byte[] cv = cv_s == null ? null : cv_s.getBytes();
+            rdfFormat = conf.get(MRUtils.FORMAT_PROP);
+            checkNotNull(rdfFormat, "Rdf format cannot be null");
+
+            namedGraph = conf.get(MRUtils.NAMED_GRAPH_PROP);
+
+            parser = Rio.createParser(RDFFormat.valueOf(rdfFormat));
+               parser.setParserConfig(new ParserConfig(true, true, true, 
RDFParser.DatatypeHandling.VERIFY));
+            parser.setRDFHandler(new RDFHandler() {
+
+                @Override
+                public void startRDF() throws RDFHandlerException {
+
+                }
+
+                @Override
+                public void endRDF() throws RDFHandlerException {
+
+                }
+
+                @Override
+                public void handleNamespace(String s, String s1) throws 
RDFHandlerException {
+
+                }
+
+                @Override
+                public void handleStatement(Statement statement) throws 
RDFHandlerException {
+                    try {
+                        RyaStatement rs = 
RdfToRyaConversions.convertStatement(statement);
+                        if(rs.getColumnVisibility() == null) {
+                            rs.setColumnVisibility(cv);
+                        }
+
+                       // Inject the specified context into the statement.
+                        if(namedGraph != null){
+                            rs.setContext(new RyaURI(namedGraph));
+                        } 
+
+                        
Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT,TripleRow> serialize = 
rowResolver.serialize(rs);
+
+                        if 
(table.contains(RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX)) {
+                            TripleRow tripleRow = 
serialize.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO);
+                            context.write(
+                                    from(tripleRow),
+                                    extractValue(tripleRow)
+                            );
+                        } else if 
(table.contains(RdfCloudTripleStoreConstants.TBL_PO_SUFFIX)) {
+                            TripleRow tripleRow = 
serialize.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO);
+                            context.write(
+                                    from(tripleRow),
+                                    extractValue(tripleRow)
+                            );
+                        } else if 
(table.contains(RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX)) {
+                            TripleRow tripleRow = 
serialize.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP);
+                            context.write(
+                                    from(tripleRow),
+                                    extractValue(tripleRow)
+                            );
+                        } else
+                            throw new IllegalArgumentException("Unrecognized 
table[" + table + "]");
+
+                    } catch (Exception e) {
+                        throw new RDFHandlerException(e);
+                    }
+                }
+
+                @Override
+                public void handleComment(String s) throws RDFHandlerException 
{
+
+                }
+            });
+        }
+
+        @Override
+        public void map(LongWritable key, Text value, Context output)
+                throws IOException, InterruptedException {
+            String rdf = value.toString();
+            try {
+                parser.parse(new StringReader(rdf), "");
+            } catch (RDFParseException e) {
+                System.out.println("Line[" + rdf + "] cannot be formatted with 
format[" + rdfFormat + "]. Exception[" + e.getMessage() + "]");
+            } catch (Exception e) {
+                e.printStackTrace();
+                throw new IOException("Exception occurred parsing triple[" + 
rdf + "]");
+            }
+        }
+    }
+
+    public static class OutStmtMutationsReducer extends Reducer<Key, Value, 
Key, Value> {
+
+        public void reduce(Key key, Iterable<Value> values, Context output)
+                throws IOException, InterruptedException {
+            output.write(key, CloudbaseRdfConstants.EMPTY_VALUE);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputByLineTool.java
----------------------------------------------------------------------
diff --git 
a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputByLineTool.java
 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputByLineTool.java
new file mode 100644
index 0000000..5aed4a2
--- /dev/null
+++ 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputByLineTool.java
@@ -0,0 +1,230 @@
+package mvm.rya.cloudbase.mr.fileinput;
+
+import cloudbase.core.client.mapreduce.CloudbaseOutputFormat;
+import cloudbase.core.data.Mutation;
+import cloudbase.core.security.ColumnVisibility;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.resolver.RdfToRyaConversions;
+import mvm.rya.cloudbase.CloudbaseRdfConstants;
+import mvm.rya.cloudbase.RyaTableMutationsFactory;
+import mvm.rya.cloudbase.mr.utils.MRUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.openrdf.model.Statement;
+import org.openrdf.rio.*;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Map;
+
+/**
+ * Do bulk import of rdf files
+ * Class RdfFileInputTool2
+ * Date: May 16, 2011
+ * Time: 3:12:16 PM
+ */
+public class RdfFileInputByLineTool implements Tool {
+
+    private Configuration conf = new Configuration();
+
+    private String userName = "root";
+    private String pwd = "password";
+    private String instance = "stratus";
+    private String zk = "10.40.190.113:2181";
+    private String tablePrefix = null;
+    private RDFFormat format = RDFFormat.NTRIPLES;
+
+    public Configuration getConf() {
+        return conf;
+    }
+
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+    }
+
+    public static void main(String[] args) {
+        try {
+            ToolRunner.run(new Configuration(), new RdfFileInputByLineTool(), 
args);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    public long runJob(String[] args) throws IOException, 
ClassNotFoundException, InterruptedException {
+        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
+        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
+        conf.set("io.sort.mb", "256");
+        conf.setLong("mapred.task.timeout", 600000000);
+
+        zk = conf.get(MRUtils.CB_ZK_PROP, zk);
+        instance = conf.get(MRUtils.CB_INSTANCE_PROP, instance);
+        userName = conf.get(MRUtils.CB_USERNAME_PROP, userName);
+        pwd = conf.get(MRUtils.CB_PWD_PROP, pwd);
+        format = RDFFormat.valueOf(conf.get(MRUtils.FORMAT_PROP, 
RDFFormat.NTRIPLES.getName()));
+
+        String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, 
RdfCloudTripleStoreConstants.TBL_PRFX_DEF);
+
+        Job job = new Job(conf);
+        job.setJarByClass(RdfFileInputByLineTool.class);
+
+        // set up cloudbase input
+        job.setInputFormatClass(TextInputFormat.class);
+        FileInputFormat.addInputPath(job, new Path(args[0]));
+
+        // set input output of the particular job
+        job.setMapOutputKeyClass(Text.class);
+        job.setMapOutputValueClass(Mutation.class);
+//        job.setOutputKeyClass(LongWritable.class);
+//        job.setOutputValueClass(StatementWritable.class);
+
+        job.setOutputFormatClass(CloudbaseOutputFormat.class);
+        CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), 
true, tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
+        CloudbaseOutputFormat.setZooKeeperInstance(job, instance, zk);
+
+        // set mapper and reducer classes
+        job.setMapperClass(TextToMutationMapper.class);
+        job.setNumReduceTasks(0);
+//        job.setReducerClass(Reducer.class);
+
+        // set output
+//        Path outputDir = new Path("/temp/sparql-out/testout");
+//        FileSystem dfs = FileSystem.get(outputDir.toUri(), conf);
+//        if (dfs.exists(outputDir))
+//            dfs.deleteMutation(outputDir, true);
+//
+//        FileOutputFormat.setOutputPath(job, outputDir);
+
+        // Submit the job
+        Date startTime = new Date();
+        System.out.println("Job started: " + startTime);
+        int exitCode = job.waitForCompletion(true) ? 0 : 1;
+
+        if (exitCode == 0) {
+            Date end_time = new Date();
+            System.out.println("Job ended: " + end_time);
+            System.out.println("The job took "
+                    + (end_time.getTime() - startTime.getTime()) / 1000
+                    + " seconds.");
+            return job
+                    .getCounters()
+                    .findCounter("org.apache.hadoop.mapred.Task$Counter",
+                            "REDUCE_OUTPUT_RECORDS").getValue();
+        } else {
+            System.out.println("Job Failed!!!");
+        }
+
+        return -1;
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+        return (int) runJob(args);
+    }
+
+    public static class TextToMutationMapper extends Mapper<LongWritable, 
Text, Text, Mutation> {
+        protected RDFParser parser;
+        private String prefix;
+        private RDFFormat rdfFormat;
+        protected Text spo_table;
+        private Text po_table;
+        private Text osp_table;
+        private byte[] cv = CloudbaseRdfConstants.EMPTY_CV.getExpression();
+
+        public TextToMutationMapper() {
+        }
+
+        @Override
+        protected void setup(final Context context) throws IOException, 
InterruptedException {
+            super.setup(context);
+            Configuration conf = context.getConfiguration();
+            prefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null);
+            if (prefix != null) {
+                RdfCloudTripleStoreConstants.prefixTables(prefix);
+            }
+
+            spo_table = new Text(prefix + 
RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
+            po_table = new Text(prefix + 
RdfCloudTripleStoreConstants.TBL_PO_SUFFIX);
+            osp_table = new Text(prefix + 
RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX);
+
+            final String cv_s = conf.get(MRUtils.CB_CV_PROP);
+            if (cv_s != null)
+                cv = cv_s.getBytes();
+
+            rdfFormat = RDFFormat.valueOf(conf.get(MRUtils.FORMAT_PROP, 
RDFFormat.NTRIPLES.toString()));
+            parser = Rio.createParser(rdfFormat);
+            final RyaTableMutationsFactory mut = new 
RyaTableMutationsFactory();
+
+            parser.setRDFHandler(new RDFHandler() {
+
+                @Override
+                public void startRDF() throws RDFHandlerException {
+
+                }
+
+                @Override
+                public void endRDF() throws RDFHandlerException {
+
+                }
+
+                @Override
+                public void handleNamespace(String s, String s1) throws 
RDFHandlerException {
+
+                }
+
+                @Override
+                public void handleStatement(Statement statement) throws 
RDFHandlerException {
+                    try {
+                        RyaStatement ryaStatement = 
RdfToRyaConversions.convertStatement(statement);
+                        if(ryaStatement.getColumnVisibility() == null) {
+                            ryaStatement.setColumnVisibility(cv);
+                        }
+                        Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, 
Collection<Mutation>> mutationMap =
+                                mut.serialize(ryaStatement);
+                        Collection<Mutation> spo = 
mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO);
+                        Collection<Mutation> po = 
mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO);
+                        Collection<Mutation> osp = 
mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP);
+
+                        for (Mutation m : spo) {
+                            context.write(spo_table, m);
+                        }
+                        for (Mutation m : po) {
+                            context.write(po_table, m);
+                        }
+                        for (Mutation m : osp) {
+                            context.write(osp_table, m);
+                        }
+                    } catch (Exception e) {
+                        throw new RDFHandlerException(e);
+                    }
+                }
+
+                @Override
+                public void handleComment(String s) throws RDFHandlerException 
{
+
+                }
+            });
+        }
+
+        @Override
+        protected void map(LongWritable key, Text value, final Context 
context) throws IOException, InterruptedException {
+            try {
+                parser.parse(new StringReader(value.toString()), "");
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
+        }
+
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputFormat.java
----------------------------------------------------------------------
diff --git 
a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputFormat.java
 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputFormat.java
new file mode 100644
index 0000000..54f9a13
--- /dev/null
+++ 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputFormat.java
@@ -0,0 +1,115 @@
+package mvm.rya.cloudbase.mr.fileinput;
+
+import mvm.rya.api.domain.utils.RyaStatementWritable;
+import mvm.rya.api.resolver.RdfToRyaConversions;
+import mvm.rya.cloudbase.mr.utils.MRUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.openrdf.model.Statement;
+import org.openrdf.rio.*;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Be able to input multiple rdf formatted files. Convert from rdf format to 
statements.
+ * Class RdfFileInputFormat
+ * Date: May 16, 2011
+ * Time: 2:11:24 PM
+ */
+public class RdfFileInputFormat extends FileInputFormat<LongWritable, 
RyaStatementWritable> {
+
+    @Override
+    public RecordReader<LongWritable, RyaStatementWritable> 
createRecordReader(InputSplit inputSplit,
+                                                                               
TaskAttemptContext taskAttemptContext)
+            throws IOException, InterruptedException {
+        return new RdfFileRecordReader();
+    }
+
+    private class RdfFileRecordReader extends RecordReader<LongWritable, 
RyaStatementWritable> implements RDFHandler {
+
+        boolean closed = false;
+        long count = 0;
+        BlockingQueue<RyaStatementWritable> queue = new 
LinkedBlockingQueue<RyaStatementWritable>();
+        int total = 0;
+
+        @Override
+        public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext) throws IOException, InterruptedException {
+            FileSplit fileSplit = (FileSplit) inputSplit;
+            Configuration conf = taskAttemptContext.getConfiguration();
+            String rdfForm_s = conf.get(MRUtils.FORMAT_PROP, 
RDFFormat.RDFXML.getName()); //default to RDF/XML
+            RDFFormat rdfFormat = RDFFormat.valueOf(rdfForm_s);
+
+            Path file = fileSplit.getPath();
+            FileSystem fs = file.getFileSystem(conf);
+            FSDataInputStream fileIn = fs.open(fileSplit.getPath());
+
+            RDFParser rdfParser = Rio.createParser(rdfFormat);
+            rdfParser.setRDFHandler(this);
+            try {
+                rdfParser.parse(fileIn, "");
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
+            fileIn.close();
+            total = queue.size();
+            //TODO: Make this threaded so that you don't hold too many 
statements before sending them
+        }
+
+        @Override
+        public boolean nextKeyValue() throws IOException, InterruptedException 
{
+            return queue.size() > 0;
+        }
+
+        @Override
+        public LongWritable getCurrentKey() throws IOException, 
InterruptedException {
+            return new LongWritable(count++);
+        }
+
+        @Override
+        public RyaStatementWritable getCurrentValue() throws IOException, 
InterruptedException {
+            return queue.poll();
+        }
+
+        @Override
+        public float getProgress() throws IOException, InterruptedException {
+            return ((float) (total - queue.size())) / ((float) total);
+        }
+
+        @Override
+        public void close() throws IOException {
+            closed = true;
+        }
+
+        @Override
+        public void startRDF() throws RDFHandlerException {
+        }
+
+        @Override
+        public void endRDF() throws RDFHandlerException {
+        }
+
+        @Override
+        public void handleNamespace(String s, String s1) throws 
RDFHandlerException {
+        }
+
+        @Override
+        public void handleStatement(Statement statement) throws 
RDFHandlerException {
+            queue.add(new 
RyaStatementWritable(RdfToRyaConversions.convertStatement(statement)));
+        }
+
+        @Override
+        public void handleComment(String s) throws RDFHandlerException {
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputTool.java
----------------------------------------------------------------------
diff --git 
a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputTool.java
 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputTool.java
new file mode 100644
index 0000000..f48cbae
--- /dev/null
+++ 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputTool.java
@@ -0,0 +1,185 @@
+package mvm.rya.cloudbase.mr.fileinput;
+
+import cloudbase.core.client.mapreduce.CloudbaseOutputFormat;
+import cloudbase.core.data.Mutation;
+import cloudbase.core.security.ColumnVisibility;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.utils.RyaStatementWritable;
+import mvm.rya.cloudbase.CloudbaseRdfConstants;
+import mvm.rya.cloudbase.RyaTableMutationsFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.openrdf.rio.RDFFormat;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Map;
+
+import static mvm.rya.cloudbase.mr.utils.MRUtils.*;
+
+/**
+ * Do bulk import of rdf files
+ * Class RdfFileInputTool
+ * Date: May 16, 2011
+ * Time: 3:12:16 PM
+ */
+public class RdfFileInputTool implements Tool {
+
+    private Configuration conf;
+
+    private String userName = "root";
+    private String pwd = "password";
+    private String instance = "stratus";
+    private String zk = "10.40.190.113:2181";
+    private String tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
+    private String format = RDFFormat.RDFXML.getName();
+
+
+    public Configuration getConf() {
+        return conf;
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+    }
+
+    public static void main(String[] args) {
+        try {
+            ToolRunner.run(new Configuration(), new RdfFileInputTool(), args);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    public long runJob(String[] args) throws IOException, 
ClassNotFoundException, InterruptedException {
+        //faster
+        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
+        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
+
+        zk = conf.get(CB_ZK_PROP, zk);
+        instance = conf.get(CB_INSTANCE_PROP, instance);
+        userName = conf.get(CB_USERNAME_PROP, userName);
+        pwd = conf.get(CB_PWD_PROP, pwd);
+
+        tablePrefix = conf.get(TABLE_PREFIX_PROPERTY, tablePrefix);
+        format = conf.get(FORMAT_PROP, format);
+        conf.set(FORMAT_PROP, format);
+
+        Job job = new Job(conf);
+        job.setJarByClass(RdfFileInputTool.class);
+
+        // set up cloudbase input
+        job.setInputFormatClass(RdfFileInputFormat.class);
+        RdfFileInputFormat.addInputPath(job, new Path(args[0]));
+
+        // set input output of the particular job
+        job.setMapOutputKeyClass(LongWritable.class);
+        job.setMapOutputValueClass(RyaStatementWritable.class);
+//        job.setOutputKeyClass(LongWritable.class);
+//        job.setOutputValueClass(StatementWritable.class);
+
+        job.setOutputFormatClass(CloudbaseOutputFormat.class);
+        CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), 
true, tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
+        CloudbaseOutputFormat.setZooKeeperInstance(job, instance, zk);
+
+        // set mapper and reducer classes
+        job.setMapperClass(StatementToMutationMapper.class);
+        job.setNumReduceTasks(0);
+//        job.setReducerClass(Reducer.class);
+
+        // set output
+//        Path outputDir = new Path("/temp/sparql-out/testout");
+//        FileSystem dfs = FileSystem.get(outputDir.toUri(), conf);
+//        if (dfs.exists(outputDir))
+//            dfs.deleteMutation(outputDir, true);
+//
+//        FileOutputFormat.setOutputPath(job, outputDir);
+
+        // Submit the job
+        Date startTime = new Date();
+        System.out.println("Job started: " + startTime);
+        int exitCode = job.waitForCompletion(true) ? 0 : 1;
+
+        if (exitCode == 0) {
+            Date end_time = new Date();
+            System.out.println("Job ended: " + end_time);
+            System.out.println("The job took "
+                    + (end_time.getTime() - startTime.getTime()) / 1000
+                    + " seconds.");
+            return job
+                    .getCounters()
+                    .findCounter("org.apache.hadoop.mapred.Task$Counter",
+                            "REDUCE_OUTPUT_RECORDS").getValue();
+        } else {
+            System.out.println("Job Failed!!!");
+        }
+
+        return -1;
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+        runJob(args);
+        return 0;
+    }
+
+    public static class StatementToMutationMapper extends Mapper<LongWritable, 
RyaStatementWritable, Text, Mutation> {
+        protected String tablePrefix;
+        protected Text spo_table;
+        protected Text po_table;
+        protected Text osp_table;
+        private byte[] cv = CloudbaseRdfConstants.EMPTY_CV.getExpression();
+        RyaTableMutationsFactory mut = new RyaTableMutationsFactory();
+
+        public StatementToMutationMapper() {
+        }
+
+        @Override
+        protected void setup(Context context) throws IOException, 
InterruptedException {
+            super.setup(context);
+            Configuration conf = context.getConfiguration();
+            tablePrefix = conf.get(TABLE_PREFIX_PROPERTY, 
RdfCloudTripleStoreConstants.TBL_PRFX_DEF);
+            spo_table = new Text(tablePrefix + 
RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
+            po_table = new Text(tablePrefix + 
RdfCloudTripleStoreConstants.TBL_PO_SUFFIX);
+            osp_table = new Text(tablePrefix + 
RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX);
+
+            final String cv_s = conf.get(CB_CV_PROP);
+            if (cv_s != null)
+                cv = cv_s.getBytes();
+        }
+
+        @Override
+        protected void map(LongWritable key, RyaStatementWritable value, 
Context context) throws IOException, InterruptedException {
+            RyaStatement statement = value.getRyaStatement();
+            if (statement.getColumnVisibility() == null) {
+                statement.setColumnVisibility(cv);
+            }
+            Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, 
Collection<Mutation>> mutationMap =
+                    mut.serialize(statement);
+            Collection<Mutation> spo = 
mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO);
+            Collection<Mutation> po = 
mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO);
+            Collection<Mutation> osp = 
mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP);
+
+            for (Mutation m : spo) {
+                context.write(spo_table, m);
+            }
+            for (Mutation m : po) {
+                context.write(po_table, m);
+            }
+            for (Mutation m : osp) {
+                context.write(osp_table, m);
+            }
+        }
+
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/ShardedBulkNtripsInputTool.java
----------------------------------------------------------------------
diff --git 
a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/ShardedBulkNtripsInputTool.java
 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/ShardedBulkNtripsInputTool.java
new file mode 100644
index 0000000..5d7d971
--- /dev/null
+++ 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/ShardedBulkNtripsInputTool.java
@@ -0,0 +1,314 @@
+//package mvm.rya.cloudbase.mr.fileinput;
+//
+//import cloudbase.core.client.Connector;
+//import cloudbase.core.client.ZooKeeperInstance;
+//import cloudbase.core.client.admin.TableOperations;
+//import cloudbase.core.client.mapreduce.CloudbaseFileOutputFormat;
+//import cloudbase.core.client.mapreduce.lib.partition.RangePartitioner;
+//import cloudbase.core.data.Key;
+//import cloudbase.core.data.Value;
+//import cloudbase.core.util.TextUtil;
+//import com.google.common.base.Preconditions;
+//import mvm.rya.api.RdfCloudTripleStoreConstants;
+//import mvm.rya.cloudbase.CloudbaseRdfConstants;
+//import mvm.rya.cloudbase.RyaTableKeyValues;
+//import mvm.rya.cloudbase.mr.utils.MRUtils;
+//import mvm.rya.cloudbase.utils.bulk.KeyRangePartitioner;
+//import mvm.rya.cloudbase.utils.shard.HashAlgorithm;
+//import mvm.rya.cloudbase.utils.shard.HashCodeHashAlgorithm;
+//import org.apache.commons.codec.binary.Base64;
+//import org.apache.hadoop.conf.Configuration;
+//import org.apache.hadoop.conf.Configured;
+//import org.apache.hadoop.fs.FileSystem;
+//import org.apache.hadoop.fs.Path;
+//import org.apache.hadoop.io.LongWritable;
+//import org.apache.hadoop.io.Text;
+//import org.apache.hadoop.mapreduce.Job;
+//import org.apache.hadoop.mapreduce.Mapper;
+//import org.apache.hadoop.mapreduce.Reducer;
+//import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+//import org.apache.hadoop.util.Tool;
+//import org.apache.hadoop.util.ToolRunner;
+//import org.openrdf.model.Resource;
+//import org.openrdf.model.Statement;
+//import org.openrdf.rio.*;
+//
+//import java.io.BufferedOutputStream;
+//import java.io.IOException;
+//import java.io.PrintStream;
+//import java.io.StringReader;
+//import java.util.ArrayList;
+//import java.util.Collection;
+//import java.util.Map;
+//
+//import static com.google.common.base.Preconditions.checkNotNull;
+//
+///**
+//* Take large ntrips files and use MapReduce and Cloudbase
+//* Bulk ingest techniques to load into the table in our partition format.
+//* Uses a sharded scheme
+//* <p/>
+//* Input: NTrips file
+//* Map:
+//* - key : shard row - Text
+//* - value : stmt in doc triple format - Text
+//* Partitioner: RangePartitioner
+//* Reduce:
+//* - key : all the entries for each triple - Cloudbase Key
+//* Class BulkNtripsInputTool
+//* Date: Sep 13, 2011
+//* Time: 10:00:17 AM
+//*/
+//public class ShardedBulkNtripsInputTool extends Configured implements Tool {
+//
+//    public static final String WORKDIR_PROP = "bulk.n3.workdir";
+//    public static final String BULK_N3_NUMSHARD = "bulk.n3.numshard";
+//
+//    private String userName = "root";
+//    private String pwd = "password";
+//    private String instance = "stratus";
+//    private String zk = "10.40.190.129:2181";
+//    private String ttl = null;
+//    private String workDirBase = "/temp/bulkcb/work";
+//    private String format = RDFFormat.NTRIPLES.getName();
+//    private int numShards;
+//
+//    @Override
+//    public int run(final String[] args) throws Exception {
+//        final Configuration conf = getConf();
+//        try {
+//            //conf
+//            zk = conf.get(MRUtils.CB_ZK_PROP, zk);
+//            ttl = conf.get(MRUtils.CB_TTL_PROP, ttl);
+//            instance = conf.get(MRUtils.CB_INSTANCE_PROP, instance);
+//            userName = conf.get(MRUtils.CB_USERNAME_PROP, userName);
+//            pwd = conf.get(MRUtils.CB_PWD_PROP, pwd);
+//            workDirBase = conf.get(WORKDIR_PROP, workDirBase);
+//            format = conf.get(MRUtils.FORMAT_PROP, format);
+//            String numShards_s = conf.get(BULK_N3_NUMSHARD);
+//            Preconditions.checkArgument(numShards_s != null);
+//            numShards = Integer.parseInt(numShards_s);
+//            conf.set(MRUtils.FORMAT_PROP, format);
+//            final String inputDir = args[0];
+//
+//            ZooKeeperInstance zooKeeperInstance = new 
ZooKeeperInstance(instance, zk);
+//            Connector connector = zooKeeperInstance.getConnector(userName, 
pwd);
+//            TableOperations tableOperations = connector.tableOperations();
+//
+//            String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, 
null);
+//            if (tablePrefix != null)
+//                RdfCloudTripleStoreConstants.prefixTables(tablePrefix);
+//            String[] tables = {tablePrefix + 
RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX,
+//                    tablePrefix + 
RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX,
+//                    tablePrefix + 
RdfCloudTripleStoreConstants.TBL_PO_SUFFIX};
+//            Collection<Job> jobs = new ArrayList<Job>();
+//            for (final String table : tables) {
+//                for (int i = 0; i < numShards; i++) {
+//                    final String tableName = table + i;
+//                    PrintStream out = null;
+//                    try {
+//                        String workDir = workDirBase + "/" + tableName;
+//                        System.out.println("Loading data into table[" + 
tableName + "]");
+//
+//                        Job job = new Job(new Configuration(conf), "Bulk 
Ingest load data to Generic RDF Table[" + tableName + "]");
+//                        job.setJarByClass(this.getClass());
+//                        //setting long job
+//                        
job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", 
false);
+//                        
job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", 
false);
+//                        job.getConfiguration().set("io.sort.mb", "256");
+//                        
job.getConfiguration().setBoolean("mapred.compress.map.output", true);
+//                        
job.getConfiguration().set("mapred.map.output.compression.codec", 
"org.apache.hadoop.io.compress.GzipCodec"); //TODO: I would like LZO compression
+//
+//                        job.setInputFormatClass(TextInputFormat.class);
+//
+//                        job.setMapperClass(ShardedParseNtripsMapper.class);
+//                        job.setMapOutputKeyClass(Key.class);
+//                        job.setMapOutputValueClass(Value.class);
+//
+//                        job.setCombinerClass(OutStmtMutationsReducer.class);
+//                        job.setReducerClass(OutStmtMutationsReducer.class);
+//                        
job.setOutputFormatClass(CloudbaseFileOutputFormat.class);
+//                        CloudbaseFileOutputFormat.setZooKeeperInstance(job, 
instance, zk);
+//
+//                        
job.getConfiguration().set(ShardedParseNtripsMapper.TABLE_PROPERTY, tableName);
+//                        
job.getConfiguration().set(ShardedParseNtripsMapper.SHARD_PROPERTY, i + "");
+//
+//                        TextInputFormat.setInputPaths(job, new 
Path(inputDir));
+//
+//                        FileSystem fs = FileSystem.get(conf);
+//                        Path workPath = new Path(workDir);
+//                        if (fs.exists(workPath))
+//                            fs.deleteMutation(workPath, true);
+//
+//                        CloudbaseFileOutputFormat.setOutputPath(job, new 
Path(workDir + "/files"));
+//
+//                        out = new PrintStream(new 
BufferedOutputStream(fs.create(new Path(workDir + "/splits.txt"))));
+//
+//                        if (!tableOperations.exists(tableName))
+//                            tableOperations.create(tableName);
+//                        Collection<Text> splits = 
tableOperations.getSplits(tableName, Integer.MAX_VALUE);
+//                        for (Text split : splits)
+//                            out.println(new 
String(Base64.encodeBase64(TextUtil.getBytes(split))));
+//
+//                        job.setNumReduceTasks(splits.size() + 1);
+//                        out.close();
+//
+//                        job.setPartitionerClass(KeyRangePartitioner.class);
+//                        RangePartitioner.setSplitFile(job, workDir + 
"/splits.txt");
+//
+//                        job.getConfiguration().set(WORKDIR_PROP, workDir);
+//
+//                        job.submit();
+//                        jobs.add(job);
+//
+//                    } catch (Exception re) {
+//                        throw new RuntimeException(re);
+//                    } finally {
+//                        if (out != null)
+//                            out.close();
+//                    }
+//                }
+//            }
+//
+//            for (Job job : jobs) {
+//                while (!job.isComplete()) {
+//                    Thread.sleep(1000);
+//                }
+//            }
+//
+//            for (String table : tables) {
+//                for (int i = 0; i < numShards; i++) {
+//                    final String tableName = table + i;
+//                    String workDir = workDirBase + "/" + tableName;
+//                    tableOperations.importDirectory(
+//                            tableName,
+//                            workDir + "/files",
+//                            workDir + "/failures",
+//                            20,
+//                            4,
+//                            false);
+//                }
+//            }
+//
+//        } catch (Exception e) {
+//            throw new RuntimeException(e);
+//        }
+//
+//        return 0;
+//    }
+//
+//    public static void main(String[] args) {
+//        try {
+//            ToolRunner.run(new Configuration(), new 
ShardedBulkNtripsInputTool(), args);
+//        } catch (Exception e) {
+//            e.printStackTrace();
+//        }
+//    }
+//
+//    /**
+//     * input: ntrips format triple
+//     * <p/>
+//     * output: key: shard row from generator
+//     * value: stmt in serialized format (document format)
+//     */
+//    public static class ShardedParseNtripsMapper extends 
Mapper<LongWritable, Text, Key, Value> {
+//        public static final String TABLE_PROPERTY = 
"shardedparsentripsmapper.table";
+//        public static final String SHARD_PROPERTY = 
"shardedparsentripsmapper.shard";
+//
+//        private RDFParser parser;
+//        private String rdfFormat;
+//        private HashAlgorithm hashAlgorithm = new HashCodeHashAlgorithm();
+//        private int shard;
+//        private int numShards;
+//
+//        @Override
+//        protected void setup(final Context context) throws IOException, 
InterruptedException {
+//            super.setup(context);
+//            Configuration conf = context.getConfiguration();
+//            final String table = conf.get(TABLE_PROPERTY);
+//            Preconditions.checkNotNull(table, "Set the " + TABLE_PROPERTY + 
" property on the map reduce job");
+//
+//            String shard_s = conf.get(SHARD_PROPERTY);
+//            Preconditions.checkNotNull(shard_s, "Set the " + SHARD_PROPERTY 
+ " property");
+//            shard = Integer.parseInt(shard_s);
+//
+//            numShards = Integer.parseInt(conf.get(BULK_N3_NUMSHARD));
+//
+//            final String cv_s = conf.get(MRUtils.CB_CV_PROP);
+//            rdfFormat = conf.get(MRUtils.FORMAT_PROP);
+//            checkNotNull(rdfFormat, "Rdf format cannot be null");
+//
+//            parser = Rio.createParser(RDFFormat.valueOf(rdfFormat));
+//            parser.setRDFHandler(new RDFHandler() {
+//
+//                @Override
+//                public void startRDF() throws RDFHandlerException {
+//
+//                }
+//
+//                @Override
+//                public void endRDF() throws RDFHandlerException {
+//
+//                }
+//
+//                @Override
+//                public void handleNamespace(String s, String s1) throws 
RDFHandlerException {
+//
+//                }
+//
+//                @Override
+//                public void handleStatement(Statement statement) throws 
RDFHandlerException {
+//                    try {
+//                        Resource subject = statement.getSubject();
+//                        if ((hashAlgorithm.hash(subject.stringValue()) % 
numShards) != shard) {
+//                            return;
+//                        }
+//                        RyaTableKeyValues rdfTableKeyValues = new 
RyaTableKeyValues(subject, statement.getPredicate(), statement.getObject(), 
cv_s, statement.getContext()).invoke();
+//                        Collection<Map.Entry<Key, Value>> entries = null;
+//                        if 
(table.contains(RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX)) {
+//                            entries = rdfTableKeyValues.getSpo();
+//                        } else if 
(table.contains(RdfCloudTripleStoreConstants.TBL_PO_SUFFIX)) {
+//                            entries = rdfTableKeyValues.getPo();
+//                        } else if 
(table.contains(RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX)) {
+//                            entries = rdfTableKeyValues.getOsp();
+//                        } else
+//                            throw new IllegalArgumentException("Unrecognized 
table[" + table + "]");
+//
+//                        for (Map.Entry<Key, Value> entry : entries) {
+//                            context.write(entry.getKey(), entry.getValue());
+//                        }
+//                    } catch (Exception e) {
+//                        throw new RDFHandlerException(e);
+//                    }
+//                }
+//
+//                @Override
+//                public void handleComment(String s) throws 
RDFHandlerException {
+//
+//                }
+//            });
+//        }
+//
+//        @Override
+//        public void map(LongWritable key, Text value, Context output)
+//                throws IOException, InterruptedException {
+//            String rdf = value.toString();
+//            try {
+//                parser.parse(new StringReader(rdf), "");
+//            } catch (RDFParseException e) {
+//                System.out.println("Line[" + rdf + "] cannot be formatted 
with format[" + rdfFormat + "]. Exception[" + e.getMessage() + "]");
+//            } catch (Exception e) {
+//                e.printStackTrace();
+//                throw new IOException("Exception occurred parsing triple[" + 
rdf + "]");
+//            }
+//        }
+//    }
+//
+//    public static class OutStmtMutationsReducer extends Reducer<Key, Value, 
Key, Value> {
+//
+//        public void reduce(Key key, Iterable<Value> values, Context output)
+//                throws IOException, InterruptedException {
+//            output.write(key, CloudbaseRdfConstants.EMPTY_VALUE);
+//        }
+//    }
+//}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/upgrade/UpgradeCloudbaseRdfTables.java
----------------------------------------------------------------------
diff --git 
a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/upgrade/UpgradeCloudbaseRdfTables.java
 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/upgrade/UpgradeCloudbaseRdfTables.java
new file mode 100644
index 0000000..453d6ca
--- /dev/null
+++ 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/upgrade/UpgradeCloudbaseRdfTables.java
@@ -0,0 +1,350 @@
+//package mvm.rya.cloudbase.mr.upgrade;
+//
+//import cloudbase.core.client.Connector;
+//import cloudbase.core.client.ZooKeeperInstance;
+//import cloudbase.core.client.admin.TableOperations;
+//import cloudbase.core.client.mapreduce.CloudbaseInputFormat;
+//import cloudbase.core.client.mapreduce.CloudbaseOutputFormat;
+//import cloudbase.core.data.Key;
+//import cloudbase.core.data.Mutation;
+//import cloudbase.core.data.Range;
+//import cloudbase.core.data.Value;
+//import cloudbase.core.security.Authorizations;
+//import cloudbase.core.security.ColumnVisibility;
+//import cloudbase.core.util.Pair;
+//import com.google.common.collect.Lists;
+//import com.google.common.io.ByteArrayDataInput;
+//import com.google.common.io.ByteArrayDataOutput;
+//import com.google.common.io.ByteStreams;
+//import mvm.rya.api.InvalidValueTypeMarkerRuntimeException;
+//import mvm.rya.api.RdfCloudTripleStoreConstants;
+//import mvm.rya.cloudbase.CloudbaseRdfConfiguration;
+//import mvm.rya.cloudbase.CloudbaseRdfConstants;
+//import mvm.rya.cloudbase.CloudbaseRyaDAO;
+//import mvm.rya.cloudbase.RyaTableMutationsFactory;
+//import mvm.rya.cloudbase.mr.utils.MRUtils;
+//import org.apache.hadoop.conf.Configuration;
+//import org.apache.hadoop.conf.Configured;
+//import org.apache.hadoop.io.Text;
+//import org.apache.hadoop.mapreduce.Job;
+//import org.apache.hadoop.mapreduce.Mapper;
+//import org.apache.hadoop.util.Tool;
+//import org.apache.hadoop.util.ToolRunner;
+//import org.openrdf.model.*;
+//import org.openrdf.model.impl.StatementImpl;
+//import org.openrdf.model.impl.ValueFactoryImpl;
+//
+//import java.io.IOException;
+//import java.util.ArrayList;
+//import java.util.Collection;
+//import java.util.Date;
+//import java.util.Map;
+//
+//import static mvm.rya.api.RdfCloudTripleStoreUtils.*;
+//
+///**
+// * 1. Check version. <br/>
+// * 2. If version does not exist, apply: <br/>
+// * - DELIM => 1 -> 0
+// * - DELIM_STOP => 2 -> 1
+// * - 3 table index
+// */
+//public class UpgradeCloudbaseRdfTables extends Configured implements Tool {
+//    public static final String TMP = "_tmp";
+//    public static final String DELETE_PROP = "rdf.upgrade.deleteMutation"; 
//true if ok to deleteMutation old tables
+//    private String zk = "10.40.190.113:2181";
+//    private String instance = "stratus";
+//    private String userName = "root";
+//    private String pwd = "password";
+//    private String tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
+//    private CloudbaseRdfConfiguration conf = new CloudbaseRdfConfiguration();
+//
+//    @Override
+//    public int run(String[] strings) throws Exception {
+//        conf = new CloudbaseRdfConfiguration(getConf());
+//        //faster
+//        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
+//        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
+//        conf.set(MRUtils.JOB_NAME_PROP, "Upgrading Cloudbase Rdf Tables");
+//
+//        zk = conf.get(MRUtils.CB_ZK_PROP, zk);
+//        instance = conf.get(MRUtils.CB_INSTANCE_PROP, instance);
+//        userName = conf.get(MRUtils.CB_USERNAME_PROP, userName);
+//        pwd = conf.get(MRUtils.CB_PWD_PROP, pwd);
+//
+//        tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, tablePrefix);
+//
+//        Authorizations authorizations = 
CloudbaseRdfConstants.ALL_AUTHORIZATIONS;
+//        String auth = conf.get(MRUtils.CB_AUTH_PROP);
+//        if (auth != null)
+//            authorizations = new Authorizations(auth.split(","));
+//
+//        boolean deleteTables = conf.getBoolean(DELETE_PROP, false);
+//
+//        //tables
+//        String spo = tablePrefix + 
RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX;
+//        String po = tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX;
+//        String osp = tablePrefix + 
RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX;
+//        String so = tablePrefix + "so";
+//        String ops = tablePrefix + "o";
+//
+//        //check version first
+//        Connector connector = new ZooKeeperInstance(instance, 
zk).getConnector(userName, pwd.getBytes());
+//        CloudbaseRyaDAO rdfDAO = new CloudbaseRyaDAO();
+//        rdfDAO.setConnector(connector);
+//        conf.setTablePrefix(tablePrefix);
+//        rdfDAO.setConf(conf);
+////        rdfDAO.setSpoTable(spo);
+////        rdfDAO.setPoTable(po);
+////        rdfDAO.setOspTable(osp);
+////        rdfDAO.setNamespaceTable(tablePrefix + 
RdfCloudTripleStoreConstants.TBL_NS_SUFFIX);
+//        rdfDAO.init();
+//        String version = rdfDAO.getVersion();
+//        if (version != null) {
+//            //TODO: Do a version check here
+//            //version found, no need to upgrade
+//            return 0;
+//        }
+//
+//        rdfDAO.destroy();
+//
+//        //create osp table, deleteMutation so and o tables
+//        TableOperations tableOperations = connector.tableOperations();
+//        if (deleteTables) {
+//            if (tableOperations.exists(so)) {
+//                tableOperations.deleteMutation(so);
+//            }
+//            if (tableOperations.exists(ops)) {
+//                tableOperations.deleteMutation(ops);
+//            }
+//        }
+//
+//        conf.set("io.sort.mb", "256");
+//        Job job = new Job(conf);
+//        job.setJarByClass(UpgradeCloudbaseRdfTables.class);
+//
+//        //set up cloudbase input
+//        job.setInputFormatClass(CloudbaseInputFormat.class);
+//        CloudbaseInputFormat.setInputInfo(job, userName, pwd.getBytes(), 
spo, authorizations);
+//        CloudbaseInputFormat.setZooKeeperInstance(job, instance, zk);
+//        Collection<Pair<Text, Text>> columns = new ArrayList<Pair<Text, 
Text>>();
+//        final Pair pair = new Pair(RdfCloudTripleStoreConstants.INFO_TXT, 
RdfCloudTripleStoreConstants.INFO_TXT);
+//        columns.add(pair);
+//        CloudbaseInputFormat.fetchColumns(job, columns);
+//
+//        CloudbaseInputFormat.setRanges(job, Lists.newArrayList(new Range(new 
Text(new byte[]{}), new Text(new byte[]{Byte.MAX_VALUE}))));
+//
+//        // set input output of the particular job
+//        job.setMapOutputKeyClass(Text.class);
+//        job.setMapOutputValueClass(Mutation.class);
+//
+//        //no reducer needed?
+//        job.setNumReduceTasks(0);
+//        job.setMapperClass(UpgradeCloudbaseRdfTablesMapper.class);
+//
+//        CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), 
true, spo + TMP);
+//        CloudbaseOutputFormat.setZooKeeperInstance(job, instance, zk);
+//        job.setOutputFormatClass(CloudbaseOutputFormat.class);
+//
+//        // Submit the job
+//        Date startTime = new Date();
+//        System.out.println("Job started: " + startTime);
+//        int exitCode = job.waitForCompletion(true) ? 0 : 1;
+//
+//        if (exitCode == 0) {
+//            Date end_time = new Date();
+//            System.out.println("Job ended: " + end_time);
+//            System.out.println("The job took "
+//                    + (end_time.getTime() - startTime.getTime()) / 1000
+//                    + " seconds.");
+//
+//            //now deleteMutation old spo table, and rename tmp one
+//            if (deleteTables) {
+//                tableOperations.deleteMutation(spo);
+//                tableOperations.rename(spo + TMP, spo);
+//                tableOperations.deleteMutation(po);
+//                tableOperations.rename(po + TMP, po);
+//                tableOperations.deleteMutation(osp);
+//                tableOperations.rename(osp + TMP, osp);
+//            }
+//
+//            return 0;
+//        } else {
+//            System.out.println("Job Failed!!!");
+//        }
+//
+//        return -1;
+//    }
+//
+//    public static void main(String[] args) {
+//        try {
+//            ToolRunner.run(new Configuration(), new 
UpgradeCloudbaseRdfTables(), args);
+//        } catch (Exception e) {
+//            e.printStackTrace();
+//        }
+//    }
+//
+//    public static class UpgradeCloudbaseRdfTablesMapper extends Mapper<Key, 
Value, Text, Mutation> {
+//        private String tablePrefix = 
RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
+//        ValueFactoryImpl vf = new ValueFactoryImpl();
+//
+//        private Text spo_table, po_table, osp_table;
+//
+//        RyaTableMutationsFactory mut = new RyaTableMutationsFactory();
+//
+//        @Override
+//        protected void setup(Context context) throws IOException, 
InterruptedException {
+//            super.setup(context);
+//            Configuration conf = context.getConfiguration();
+//            tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, 
tablePrefix);
+//            String spo = tablePrefix + 
RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX + TMP;
+//            String po = tablePrefix + 
RdfCloudTripleStoreConstants.TBL_PO_SUFFIX + TMP;
+//            String osp = tablePrefix + 
RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX + TMP;
+//
+//            spo_table = new Text(spo);
+//            po_table = new Text(po);
+//            osp_table = new Text(osp);
+//        }
+//
+//        @Override
+//        protected void map(Key key, Value value, Context context) throws 
IOException, InterruptedException {
+//            //read in old format
+//            Statement statement = null;
+//            try {
+//                statement = 
translateOldStatementFromRow(ByteStreams.newDataInput(key.getRow().getBytes()), 
"spo", vf);
+//            } catch (Exception e) {
+//                //not the right version
+//                return;
+//            }
+//
+//            //translate to new format and save in new tables
+//            Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Mutation> 
mutationMap = mut.serialize(statement.getSubject(), statement.getPredicate(), 
statement.getObject(), new ColumnVisibility(key.getColumnVisibility()), 
statement.getContext());
+//            Mutation spo = 
mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO);
+//            Mutation po = 
mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO);
+//            Mutation osp = 
mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP);
+//
+//            context.write(spo_table, spo);
+//            context.write(po_table, po);
+//            context.write(osp_table, osp);
+//
+//            //TODO: Contexts
+//        }
+//    }
+//
+//    public static org.openrdf.model.Value readOldValue(ByteArrayDataInput 
dataIn, ValueFactory vf)
+//            throws IOException, ClassCastException {
+//        int valueTypeMarker;
+//        try {
+//            valueTypeMarker = dataIn.readByte();
+//        } catch (Exception e) {
+//            return null;
+//        }
+//
+//        org.openrdf.model.Value ret = null;
+//        if (valueTypeMarker == RdfCloudTripleStoreConstants.URI_MARKER) {
+//            String uriString = readString(dataIn);
+//            ret = vf.createURI(uriString);
+//        } else if (valueTypeMarker == 
RdfCloudTripleStoreConstants.BNODE_MARKER) {
+//            String bnodeID = readString(dataIn);
+//            ret = vf.createBNode(bnodeID);
+//        } else if (valueTypeMarker == 
RdfCloudTripleStoreConstants.PLAIN_LITERAL_MARKER) {
+//            String label = readString(dataIn);
+//            ret = vf.createLiteral(label);
+//        } else if (valueTypeMarker == 
RdfCloudTripleStoreConstants.LANG_LITERAL_MARKER) {
+//            String label = readString(dataIn);
+//            String language = readString(dataIn);
+//            ret = vf.createLiteral(label, language);
+//        } else if (valueTypeMarker == 
RdfCloudTripleStoreConstants.DATATYPE_LITERAL_MARKER) {
+//            String label = readString(dataIn);
+//            URI datatype = (URI) readOldValue(dataIn, vf);
+//            ret = vf.createLiteral(label, datatype);
+//        } else {
+//            throw new 
InvalidValueTypeMarkerRuntimeException(valueTypeMarker, "Invalid value type 
marker: "
+//                    + valueTypeMarker);
+//        }
+//
+//        return ret;
+//    }
+//
+//    public static Statement translateOldStatementFromRow(ByteArrayDataInput 
input, String table, ValueFactory vf) throws IOException {
+//        Resource subject;
+//        URI predicate;
+//        org.openrdf.model.Value object;
+//        if ("spo".equals(table)) {
+//            subject = (Resource) readOldValue(input, vf);
+//            input.readByte();
+//            predicate = (URI) readOldValue(input, vf);
+//            input.readByte();
+//            object = readOldValue(input, vf);
+//        } else if ("o".equals(table)) {
+//            object = readOldValue(input, vf);
+//            input.readByte();
+//            predicate = (URI) readOldValue(input, vf);
+//            input.readByte();
+//            subject = (Resource) readOldValue(input, vf);
+//        } else if ("po".equals(table)) {
+//            predicate = (URI) readOldValue(input, vf);
+//            input.readByte();
+//            object = readOldValue(input, vf);
+//            input.readByte();
+//            subject = (Resource) readOldValue(input, vf);
+//        } else {
+//            //so
+//            subject = (Resource) readOldValue(input, vf);
+//            input.readByte();
+//            object = readOldValue(input, vf);
+//            input.readByte();
+//            predicate = (URI) readOldValue(input, vf);
+//        }
+//        return new StatementImpl(subject, predicate, object);
+//    }
+//
+//    public static byte[] writeOldValue(org.openrdf.model.Value value) throws 
IOException {
+//        if (value == null)
+//            return new byte[]{};
+//        ByteArrayDataOutput dataOut = ByteStreams.newDataOutput();
+//        if (value instanceof URI) {
+//            dataOut.writeByte(RdfCloudTripleStoreConstants.URI_MARKER);
+//            writeString(((URI) value).toString(), dataOut);
+//        } else if (value instanceof BNode) {
+//            dataOut.writeByte(RdfCloudTripleStoreConstants.BNODE_MARKER);
+//            writeString(((BNode) value).getID(), dataOut);
+//        } else if (value instanceof Literal) {
+//            Literal lit = (Literal) value;
+//
+//            String label = lit.getLabel();
+//            String language = lit.getLanguage();
+//            URI datatype = lit.getDatatype();
+//
+//            if (datatype != null) {
+//                
dataOut.writeByte(RdfCloudTripleStoreConstants.DATATYPE_LITERAL_MARKER);
+//                writeString(label, dataOut);
+//                dataOut.write(writeOldValue(datatype));
+//            } else if (language != null) {
+//                
dataOut.writeByte(RdfCloudTripleStoreConstants.LANG_LITERAL_MARKER);
+//                writeString(label, dataOut);
+//                writeString(language, dataOut);
+//            } else {
+//                
dataOut.writeByte(RdfCloudTripleStoreConstants.PLAIN_LITERAL_MARKER);
+//                writeString(label, dataOut);
+//            }
+//        } else {
+//            throw new IllegalArgumentException("unexpected value type: "
+//                    + value.getClass());
+//        }
+//        return dataOut.toByteArray();
+//    }
+//
+//    private static String OLD_DELIM = "\u0001";
+//    private static byte[] OLD_DELIM_BYTES = OLD_DELIM.getBytes();
+//
+//    public static byte[] buildOldRowWith(byte[] bytes_one, byte[] bytes_two, 
byte[] bytes_three) throws IOException {
+//        ByteArrayDataOutput rowidout = ByteStreams.newDataOutput();
+//        rowidout.write(bytes_one);
+//        rowidout.write(OLD_DELIM_BYTES);
+//        rowidout.write(bytes_two);
+//        rowidout.write(OLD_DELIM_BYTES);
+//        rowidout.write(bytes_three);
+//        return truncateRowId(rowidout.toByteArray());
+//    }
+//}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/utils/MRUtils.java
----------------------------------------------------------------------
diff --git 
a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/utils/MRUtils.java 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/utils/MRUtils.java
new file mode 100644
index 0000000..950f585
--- /dev/null
+++ b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/utils/MRUtils.java
@@ -0,0 +1,94 @@
+package mvm.rya.cloudbase.mr.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.openrdf.model.URI;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+
+/**
+ * Class MRSailUtils
+ * Date: May 19, 2011
+ * Time: 10:34:06 AM
+ */
+public class MRUtils {
+
+    public static final String JOB_NAME_PROP = "mapred.job.name";
+
+    public static final String CB_USERNAME_PROP = "cb.username";
+    public static final String CB_PWD_PROP = "cb.pwd";
+    public static final String CB_ZK_PROP = "cb.zk";
+    public static final String CB_INSTANCE_PROP = "cb.instance";
+    public static final String CB_TTL_PROP = "cb.ttl";
+    public static final String CB_CV_PROP = "cb.cv";
+    public static final String CB_AUTH_PROP = "cb.auth";
+    public static final String CB_MOCK_PROP = "cb.mock";
+    public static final String TABLE_LAYOUT_PROP = "rdf.tablelayout";
+    public static final String FORMAT_PROP = "rdf.format";
+
+    public static final String NAMED_GRAPH_PROP = "rdf.graph";
+
+    public static final String TABLE_PREFIX_PROPERTY = "rdf.tablePrefix";
+
+    // rdf constants
+    public static final ValueFactory vf = new ValueFactoryImpl();
+    public static final URI RDF_TYPE = 
vf.createURI("http://www.w3.org/1999/02/22-rdf-syntax-ns#";, "type");
+
+
+    // cloudbase map reduce utils
+
+//    public static Range retrieveRange(URI entry_key, URI entry_val) throws 
IOException {
+//        ByteArrayDataOutput startRowOut = ByteStreams.newDataOutput();
+//        startRowOut.write(RdfCloudTripleStoreUtils.writeValue(entry_key));
+//        if (entry_val != null) {
+//            startRowOut.write(RdfCloudTripleStoreConstants.DELIM_BYTES);
+//            
startRowOut.write(RdfCloudTripleStoreUtils.writeValue(entry_val));
+//        }
+//        byte[] startrow = startRowOut.toByteArray();
+//        startRowOut.write(RdfCloudTripleStoreConstants.DELIM_STOP_BYTES);
+//        byte[] stoprow = startRowOut.toByteArray();
+//
+//        Range range = new Range(new Text(startrow), new Text(stoprow));
+//        return range;
+//    }
+
+
+    public static String getCBTtl(Configuration conf) {
+        return conf.get(CB_TTL_PROP);
+    }
+
+    public static String getCBUserName(Configuration conf) {
+        return conf.get(CB_USERNAME_PROP);
+    }
+
+    public static String getCBPwd(Configuration conf) {
+        return conf.get(CB_PWD_PROP);
+    }
+
+    public static String getCBZK(Configuration conf) {
+        return conf.get(CB_ZK_PROP);
+    }
+
+    public static String getCBInstance(Configuration conf) {
+        return conf.get(CB_INSTANCE_PROP);
+    }
+
+    public static void setCBUserName(Configuration conf, String str) {
+        conf.set(CB_USERNAME_PROP, str);
+    }
+
+    public static void setCBPwd(Configuration conf, String str) {
+        conf.set(CB_PWD_PROP, str);
+    }
+
+    public static void setCBZK(Configuration conf, String str) {
+        conf.set(CB_ZK_PROP, str);
+    }
+
+    public static void setCBInstance(Configuration conf, String str) {
+        conf.set(CB_INSTANCE_PROP, str);
+    }
+
+    public static void setCBTtl(Configuration conf, String str) {
+        conf.set(CB_TTL_PROP, str);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/BatchScannerCloseableIterable.java
----------------------------------------------------------------------
diff --git 
a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/BatchScannerCloseableIterable.java
 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/BatchScannerCloseableIterable.java
new file mode 100644
index 0000000..d3f8ae7
--- /dev/null
+++ 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/BatchScannerCloseableIterable.java
@@ -0,0 +1,34 @@
+package mvm.rya.cloudbase.query;
+
+import cloudbase.core.client.BatchScanner;
+import cloudbase.core.data.Key;
+import cloudbase.core.data.Value;
+import com.google.common.base.Preconditions;
+import mango.collect.AbstractCloseableIterable;
+import mvm.rya.cloudbase.BatchScannerIterator;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ */
+public class BatchScannerCloseableIterable extends 
AbstractCloseableIterable<Map.Entry<Key, Value>> {
+
+    private BatchScanner scanner;
+
+    public BatchScannerCloseableIterable(BatchScanner scanner) {
+        Preconditions.checkNotNull(scanner);
+        this.scanner = scanner;
+    }
+
+    @Override
+    protected void doClose() throws IOException {
+        scanner.close();
+    }
+
+    @Override
+    protected Iterator<Map.Entry<Key, Value>> retrieveIterator() {
+        return new BatchScannerIterator(scanner.iterator());
+    }
+}

Reply via email to