http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/BulkNtripsInputTool.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/BulkNtripsInputTool.java
 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/BulkNtripsInputTool.java
deleted file mode 100644
index c3ddcfd..0000000
--- 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/BulkNtripsInputTool.java
+++ /dev/null
@@ -1,369 +0,0 @@
-package mvm.rya.accumulo.mr.fileinput;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static mvm.rya.accumulo.AccumuloRdfUtils.extractValue;
-import static mvm.rya.accumulo.AccumuloRdfUtils.from;
-
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.io.StringReader;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.AccumuloRdfConstants;
-import mvm.rya.accumulo.mr.utils.MRUtils;
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.api.resolver.RdfToRyaConversions;
-import mvm.rya.api.resolver.RyaTripleContext;
-import mvm.rya.api.resolver.triple.TripleRow;
-import mvm.rya.api.resolver.triple.TripleRowResolver;
-
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.admin.TableOperations;
-import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
-import 
org.apache.accumulo.core.client.mapreduce.lib.partition.KeyRangePartitioner;
-import 
org.apache.accumulo.core.client.mapreduce.lib.partition.RangePartitioner;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.util.TextUtil;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.openrdf.model.Statement;
-import org.openrdf.rio.ParserConfig;
-import org.openrdf.rio.RDFFormat;
-import org.openrdf.rio.RDFHandler;
-import org.openrdf.rio.RDFHandlerException;
-import org.openrdf.rio.RDFParseException;
-import org.openrdf.rio.RDFParser;
-import org.openrdf.rio.nquads.NQuadsParser;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Take large ntrips files and use MapReduce and Cloudbase
- * Bulk ingest techniques to load into the table in our partition format.
- * <p/>
- * Input: NTrips file
- * Map:
- * - key : shard row - Text
- * - value : stmt in doc triple format - Text
- * Partitioner: RangePartitioner
- * Reduce:
- * - key : all the entries for each triple - Cloudbase Key
- * Class BulkNtripsInputTool
- * Date: Sep 13, 2011
- * Time: 10:00:17 AM
- */
-public class BulkNtripsInputTool extends Configured implements Tool {
-
-    public static final String WORKDIR_PROP = "bulk.n3.workdir";
-
-    private String userName = "root";
-    private String pwd = "root";
-    private String instance = "isntance";
-    private String zk = "zoo";
-    private String ttl = null;
-    private String workDirBase = "/temp/bulkcb/work";
-    private String format = RDFFormat.NQUADS.getName();
-
-    @Override
-    public int run(final String[] args) throws Exception {
-        final Configuration conf = getConf();
-        try {
-            //conf
-            zk = conf.get(MRUtils.AC_ZK_PROP, zk);
-            ttl = conf.get(MRUtils.AC_TTL_PROP, ttl);
-            instance = conf.get(MRUtils.AC_INSTANCE_PROP, instance);
-            userName = conf.get(MRUtils.AC_USERNAME_PROP, userName);
-            pwd = conf.get(MRUtils.AC_PWD_PROP, pwd);
-            workDirBase = conf.get(WORKDIR_PROP, workDirBase);
-            format = conf.get(MRUtils.FORMAT_PROP, format);
-            conf.set(MRUtils.FORMAT_PROP, format);
-            final String inputDir = args[0];
-
-            ZooKeeperInstance zooKeeperInstance = new 
ZooKeeperInstance(instance, zk);
-            Connector connector = zooKeeperInstance.getConnector(userName, new 
PasswordToken(pwd));
-            TableOperations tableOperations = connector.tableOperations();
-            
-            if (conf.get(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS) != 
null ) {
-                throw new IllegalArgumentException("Cannot use Bulk N Trips 
tool with Additional Indexers");
-            }
-
-            String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null);
-            if (tablePrefix != null)
-                RdfCloudTripleStoreConstants.prefixTables(tablePrefix);
-            String[] tables = {tablePrefix + 
RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX,
-                    tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX,
-                    tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX};
-            Collection<Job> jobs = new ArrayList<Job>();
-            for (final String tableName : tables) {
-                PrintStream out = null;
-                try {
-                    String workDir = workDirBase + "/" + tableName;
-                    System.out.println("Loading data into table[" + tableName 
+ "]");
-
-                    Job job = new Job(new Configuration(conf), "Bulk Ingest 
load data to Generic RDF Table[" + tableName + "]");
-                    job.setJarByClass(this.getClass());
-                    //setting long job
-                    Configuration jobConf = job.getConfiguration();
-                    
jobConf.setBoolean("mapred.map.tasks.speculative.execution", false);
-                    
jobConf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
-                    jobConf.set("io.sort.mb", jobConf.get("io.sort.mb", 
"256"));
-                    jobConf.setBoolean("mapred.compress.map.output", true);
-//                    jobConf.set("mapred.map.output.compression.codec", 
"org.apache.hadoop.io.compress.GzipCodec"); //TODO: I would like LZO compression
-
-                    job.setInputFormatClass(TextInputFormat.class);
-
-                    job.setMapperClass(ParseNtripsMapper.class);
-                    job.setMapOutputKeyClass(Key.class);
-                    job.setMapOutputValueClass(Value.class);
-
-                    job.setCombinerClass(OutStmtMutationsReducer.class);
-                    job.setReducerClass(OutStmtMutationsReducer.class);
-                    job.setOutputFormatClass(AccumuloFileOutputFormat.class);
-                   // AccumuloFileOutputFormat.setZooKeeperInstance(jobConf, 
instance, zk);
-
-                    jobConf.set(ParseNtripsMapper.TABLE_PROPERTY, tableName);
-
-                    TextInputFormat.setInputPaths(job, new Path(inputDir));
-
-                    FileSystem fs = FileSystem.get(conf);
-                    Path workPath = new Path(workDir);
-                    if (fs.exists(workPath))
-                        fs.delete(workPath, true);
-
-                    //make failures dir
-                    Path failures = new Path(workDir, "failures");
-                    fs.delete(failures, true);
-                    fs.mkdirs(new Path(workDir, "failures"));
-
-                    AccumuloFileOutputFormat.setOutputPath(job, new 
Path(workDir + "/files"));
-
-                    out = new PrintStream(new 
BufferedOutputStream(fs.create(new Path(workDir + "/splits.txt"))));
-
-                    if (!tableOperations.exists(tableName))
-                        tableOperations.create(tableName);
-                    Collection<Text> splits = 
tableOperations.getSplits(tableName, Integer.MAX_VALUE);
-                    for (Text split : splits)
-                        out.println(new 
String(Base64.encodeBase64(TextUtil.getBytes(split))));
-
-                    job.setNumReduceTasks(splits.size() + 1);
-                    out.close();
-
-                    job.setPartitionerClass(KeyRangePartitioner.class);
-                    RangePartitioner.setSplitFile(job, workDir + 
"/splits.txt");
-
-                    jobConf.set(WORKDIR_PROP, workDir);
-
-                    job.submit();
-                    jobs.add(job);
-
-                } catch (Exception re) {
-                    throw new RuntimeException(re);
-                } finally {
-                    if (out != null)
-                        out.close();
-                }
-            }
-
-            for (Job job : jobs) {
-                while (!job.isComplete()) {
-                    Thread.sleep(1000);
-                }
-            }
-
-            for(String tableName : tables) {
-                String workDir = workDirBase + "/" + tableName;
-                String filesDir = workDir + "/files";
-                String failuresDir = workDir + "/failures";
-                
-                FileSystem fs = FileSystem.get(conf);
-                
-                //make sure that the "accumulo" user can read/write/execute 
into these directories this path
-                fs.setPermission(new Path(filesDir), new 
FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
-                fs.setPermission(new Path(failuresDir), new 
FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
-                
-                tableOperations.importDirectory(
-                        tableName,
-                        filesDir,
-                        failuresDir,
-                        false);
-                
-            }
-
-        } catch (Exception e ){
-            throw new RuntimeException(e);
-        }
-
-        return 0;
-    }
-
-    public static void main(String[] args) throws Exception {
-       ToolRunner.run(new Configuration(), new BulkNtripsInputTool(), args);
-    }
-
-    /**
-     * input: ntrips format triple
-     * <p/>
-     * output: key: shard row from generator
-     * value: stmt in serialized format (document format)
-     */
-    public static class ParseNtripsMapper extends Mapper<LongWritable, Text, 
Key, Value> {
-        public static final String TABLE_PROPERTY = "parsentripsmapper.table";
-
-        private RDFParser parser;
-        private String rdfFormat;
-        private String namedGraph;
-        private RyaTripleContext ryaContext;
-        private TripleRowResolver rowResolver;
-
-        @Override
-        protected void setup(final Context context) throws IOException, 
InterruptedException {
-            super.setup(context);
-            Configuration conf = context.getConfiguration();
-            final String table = conf.get(TABLE_PROPERTY);
-            Preconditions.checkNotNull(table, "Set the " + TABLE_PROPERTY + " 
property on the map reduce job");
-            this.ryaContext = RyaTripleContext.getInstance(new 
AccumuloRdfConfiguration(conf));
-            rowResolver = ryaContext.getTripleResolver();
-
-            final String cv_s = conf.get(MRUtils.AC_CV_PROP);
-            final byte[] cv = cv_s == null ? null : cv_s.getBytes();
-            rdfFormat = conf.get(MRUtils.FORMAT_PROP);
-            checkNotNull(rdfFormat, "Rdf format cannot be null");
-
-            namedGraph = conf.get(MRUtils.NAMED_GRAPH_PROP);
-
-            parser = new NQuadsParser();
-               parser.setParserConfig(new ParserConfig(true, true, true, 
RDFParser.DatatypeHandling.VERIFY));
-            parser.setRDFHandler(new RDFHandler() {
-
-                @Override
-                public void startRDF() throws RDFHandlerException {
-
-                }
-
-                @Override
-                public void endRDF() throws RDFHandlerException {
-
-                }
-
-                @Override
-                public void handleNamespace(String s, String s1) throws 
RDFHandlerException {
-
-                }
-
-                @Override
-                public void handleStatement(Statement statement) throws 
RDFHandlerException {
-                    try {
-                        RyaStatement rs = 
RdfToRyaConversions.convertStatement(statement);
-                        if(rs.getColumnVisibility() == null) {
-                            rs.setColumnVisibility(cv);
-                        }
-
-                       // Inject the specified context into the statement.
-                        if(namedGraph != null){
-                            rs.setContext(new RyaURI(namedGraph));
-                        } else if (statement.getContext() != null) {
-                            rs.setContext(new 
RyaURI(statement.getContext().toString()));
-                        } 
-
-                        
Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT,TripleRow> serialize = 
rowResolver.serialize(rs);
-
-                        if 
(table.contains(RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX)) {
-                            TripleRow tripleRow = 
serialize.get(TABLE_LAYOUT.SPO);
-                            context.write(
-                                    from(tripleRow),
-                                    extractValue(tripleRow)
-                            );
-                        } else if 
(table.contains(RdfCloudTripleStoreConstants.TBL_PO_SUFFIX)) {
-                            TripleRow tripleRow = 
serialize.get(TABLE_LAYOUT.PO);
-                            context.write(
-                                    from(tripleRow),
-                                    extractValue(tripleRow)
-                            );
-                        } else if 
(table.contains(RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX)) {
-                            TripleRow tripleRow = 
serialize.get(TABLE_LAYOUT.OSP);
-                            context.write(
-                                    from(tripleRow),
-                                    extractValue(tripleRow)
-                            );
-                        } else
-                            throw new IllegalArgumentException("Unrecognized 
table[" + table + "]");
-
-                    } catch (Exception e) {
-                        throw new RDFHandlerException(e);
-                    }
-                }
-
-                @Override
-                public void handleComment(String s) throws RDFHandlerException 
{
-
-                }
-            });
-        }
-
-        @Override
-        public void map(LongWritable key, Text value, Context output)
-                throws IOException, InterruptedException {
-            String rdf = value.toString();
-            try {
-                parser.parse(new StringReader(rdf), "");
-            } catch (RDFParseException e) {
-                System.out.println("Line[" + rdf + "] cannot be formatted with 
format[" + rdfFormat + "]. Exception[" + e.getMessage() + "]");
-            } catch (Exception e) {
-                e.printStackTrace();
-                throw new IOException("Exception occurred parsing triple[" + 
rdf + "]");
-            }
-        }
-    }
-
-    public static class OutStmtMutationsReducer extends Reducer<Key, Value, 
Key, Value> {
-
-        public void reduce(Key key, Iterable<Value> values, Context output)
-                throws IOException, InterruptedException {
-            output.write(key, AccumuloRdfConstants.EMPTY_VALUE);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputByLineTool.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputByLineTool.java
 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputByLineTool.java
deleted file mode 100644
index 5a872a0..0000000
--- 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputByLineTool.java
+++ /dev/null
@@ -1,251 +0,0 @@
-package mvm.rya.accumulo.mr.fileinput;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import java.io.IOException;
-import java.io.StringReader;
-import java.util.Collection;
-import java.util.Date;
-import java.util.Map;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.AccumuloRdfConstants;
-import mvm.rya.accumulo.RyaTableMutationsFactory;
-import mvm.rya.accumulo.mr.utils.MRUtils;
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.resolver.RdfToRyaConversions;
-import mvm.rya.api.resolver.RyaTripleContext;
-
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.openrdf.model.Statement;
-import org.openrdf.rio.RDFFormat;
-import org.openrdf.rio.RDFHandler;
-import org.openrdf.rio.RDFHandlerException;
-import org.openrdf.rio.RDFParser;
-import org.openrdf.rio.Rio;
-
-/**
- * Do bulk import of rdf files
- * Class RdfFileInputTool2
- * Date: May 16, 2011
- * Time: 3:12:16 PM
- */
-public class RdfFileInputByLineTool implements Tool {
-
-    private Configuration conf = new Configuration();
-
-    private String userName = "root";
-    private String pwd = "password";
-    private String instance = "instance";
-    private String zk = "zoo";
-    private String tablePrefix = null;
-    private RDFFormat format = RDFFormat.NTRIPLES;
-
-    public Configuration getConf() {
-        return conf;
-    }
-
-    public void setConf(Configuration conf) {
-        this.conf = conf;
-    }
-
-    public static void main(String[] args) {
-        try {
-            ToolRunner.run(new Configuration(), new RdfFileInputByLineTool(), 
args);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    public long runJob(String[] args) throws IOException, 
ClassNotFoundException, InterruptedException, AccumuloSecurityException {
-        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
-        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
-        conf.set("io.sort.mb", "256");
-        conf.setLong("mapred.task.timeout", 600000000);
-
-        zk = conf.get(MRUtils.AC_ZK_PROP, zk);
-        instance = conf.get(MRUtils.AC_INSTANCE_PROP, instance);
-        userName = conf.get(MRUtils.AC_USERNAME_PROP, userName);
-        pwd = conf.get(MRUtils.AC_PWD_PROP, pwd);
-        format = RDFFormat.valueOf(conf.get(MRUtils.FORMAT_PROP, 
RDFFormat.NTRIPLES.toString()));
-
-        String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, 
RdfCloudTripleStoreConstants.TBL_PRFX_DEF);
-
-        Job job = new Job(conf);
-        job.setJarByClass(RdfFileInputByLineTool.class);
-
-        // set up cloudbase input
-        job.setInputFormatClass(TextInputFormat.class);
-        FileInputFormat.addInputPath(job, new Path(args[0]));
-
-        // set input output of the particular job
-        job.setMapOutputKeyClass(Text.class);
-        job.setMapOutputValueClass(Mutation.class);
-
-        job.setOutputFormatClass(AccumuloOutputFormat.class);
-        AccumuloOutputFormat.setConnectorInfo(job, userName, new 
PasswordToken(pwd.getBytes()));
-        AccumuloOutputFormat.setCreateTables(job, true);
-        AccumuloOutputFormat.setDefaultTableName(job, tablePrefix + 
RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
-        AccumuloOutputFormat.setZooKeeperInstance(job, instance, zk);
-
-        // set mapper and reducer classes
-        job.setMapperClass(TextToMutationMapper.class);
-        job.setNumReduceTasks(0);
-
-        // Submit the job
-        Date startTime = new Date();
-        System.out.println("Job started: " + startTime);
-        int exitCode = job.waitForCompletion(true) ? 0 : 1;
-
-        if (exitCode == 0) {
-            Date end_time = new Date();
-            System.out.println("Job ended: " + end_time);
-            System.out.println("The job took "
-                    + (end_time.getTime() - startTime.getTime()) / 1000
-                    + " seconds.");
-            return job
-                    .getCounters()
-                    .findCounter("org.apache.hadoop.mapred.Task$Counter",
-                            "REDUCE_OUTPUT_RECORDS").getValue();
-        } else {
-            System.out.println("Job Failed!!!");
-        }
-
-        return -1;
-    }
-
-    @Override
-    public int run(String[] args) throws Exception {
-        return (int) runJob(args);
-    }
-
-    public static class TextToMutationMapper extends Mapper<LongWritable, 
Text, Text, Mutation> {
-        protected RDFParser parser;
-        private String prefix;
-        private RDFFormat rdfFormat;
-        protected Text spo_table;
-        private Text po_table;
-        private Text osp_table;
-        private byte[] cv = AccumuloRdfConstants.EMPTY_CV.getExpression();
-
-        public TextToMutationMapper() {
-        }
-
-        @Override
-        protected void setup(final Context context) throws IOException, 
InterruptedException {
-            super.setup(context);
-            Configuration conf = context.getConfiguration();
-            prefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null);
-            if (prefix != null) {
-                RdfCloudTripleStoreConstants.prefixTables(prefix);
-            }
-
-            spo_table = new Text(prefix + 
RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
-            po_table = new Text(prefix + 
RdfCloudTripleStoreConstants.TBL_PO_SUFFIX);
-            osp_table = new Text(prefix + 
RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX);
-
-            final String cv_s = conf.get(MRUtils.AC_CV_PROP);
-            if (cv_s != null)
-                cv = cv_s.getBytes();
-
-            rdfFormat = RDFFormat.valueOf(conf.get(MRUtils.FORMAT_PROP, 
RDFFormat.NTRIPLES.toString()));
-            parser = Rio.createParser(rdfFormat);
-            RyaTripleContext tripleContext = RyaTripleContext.getInstance(new 
AccumuloRdfConfiguration(conf));
-            final RyaTableMutationsFactory mut = new 
RyaTableMutationsFactory(tripleContext);
-
-            parser.setRDFHandler(new RDFHandler() {
-
-                @Override
-                public void startRDF() throws RDFHandlerException {
-
-                }
-
-                @Override
-                public void endRDF() throws RDFHandlerException {
-
-                }
-
-                @Override
-                public void handleNamespace(String s, String s1) throws 
RDFHandlerException {
-
-                }
-
-                @Override
-                public void handleStatement(Statement statement) throws 
RDFHandlerException {
-                    try {
-                        RyaStatement ryaStatement = 
RdfToRyaConversions.convertStatement(statement);
-                        if(ryaStatement.getColumnVisibility() == null) {
-                            ryaStatement.setColumnVisibility(cv);
-                        }
-                        Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, 
Collection<Mutation>> mutationMap =
-                                mut.serialize(ryaStatement);
-                        Collection<Mutation> spo = 
mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO);
-                        Collection<Mutation> po = 
mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO);
-                        Collection<Mutation> osp = 
mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP);
-
-                        for (Mutation m : spo) {
-                            context.write(spo_table, m);
-                        }
-                        for (Mutation m : po) {
-                            context.write(po_table, m);
-                        }
-                        for (Mutation m : osp) {
-                            context.write(osp_table, m);
-                        }
-                    } catch (Exception e) {
-                        throw new RDFHandlerException(e);
-                    }
-                }
-
-                @Override
-                public void handleComment(String s) throws RDFHandlerException 
{
-
-                }
-            });
-        }
-
-        @Override
-        protected void map(LongWritable key, Text value, final Context 
context) throws IOException, InterruptedException {
-            try {
-                parser.parse(new StringReader(value.toString()), "");
-            } catch (Exception e) {
-                throw new IOException(e);
-            }
-        }
-
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputFormat.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputFormat.java
 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputFormat.java
deleted file mode 100644
index f20dfe3..0000000
--- 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputFormat.java
+++ /dev/null
@@ -1,146 +0,0 @@
-package mvm.rya.accumulo.mr.fileinput;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.mr.utils.MRUtils;
-import mvm.rya.api.domain.utils.RyaStatementWritable;
-import mvm.rya.api.resolver.RdfToRyaConversions;
-import mvm.rya.api.resolver.RyaTripleContext;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.openrdf.model.Statement;
-import org.openrdf.rio.RDFFormat;
-import org.openrdf.rio.RDFHandler;
-import org.openrdf.rio.RDFHandlerException;
-import org.openrdf.rio.RDFParser;
-import org.openrdf.rio.Rio;
-
-/**
- * Be able to input multiple rdf formatted files. Convert from rdf format to 
statements.
- * Class RdfFileInputFormat
- * Date: May 16, 2011
- * Time: 2:11:24 PM
- */
-public class RdfFileInputFormat extends FileInputFormat<LongWritable, 
RyaStatementWritable> {
-
-    @Override
-    public RecordReader<LongWritable, RyaStatementWritable> 
createRecordReader(InputSplit inputSplit,
-                                                                               
TaskAttemptContext taskAttemptContext)
-            throws IOException, InterruptedException {
-        return new RdfFileRecordReader();
-    }
-
-    private class RdfFileRecordReader extends RecordReader<LongWritable, 
RyaStatementWritable> implements RDFHandler {
-
-        boolean closed = false;
-        long count = 0;
-        BlockingQueue<RyaStatementWritable> queue = new 
LinkedBlockingQueue<RyaStatementWritable>();
-        int total = 0;
-               private RyaTripleContext tripleContext;
-        
-
-        @Override
-        public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext) throws IOException, InterruptedException {
-            FileSplit fileSplit = (FileSplit) inputSplit;
-            Configuration conf = taskAttemptContext.getConfiguration();
-            String rdfForm_s = conf.get(MRUtils.FORMAT_PROP, 
RDFFormat.RDFXML.getName());
-            RDFFormat rdfFormat = RDFFormat.valueOf(rdfForm_s);
-            tripleContext = RyaTripleContext.getInstance(new 
AccumuloRdfConfiguration(conf));
-
-            Path file = fileSplit.getPath();
-            FileSystem fs = file.getFileSystem(conf);
-            FSDataInputStream fileIn = fs.open(fileSplit.getPath());
-
-            RDFParser rdfParser = Rio.createParser(rdfFormat);
-            rdfParser.setRDFHandler(this);
-            try {
-                rdfParser.parse(fileIn, "");
-            } catch (Exception e) {
-                throw new IOException(e);
-            }
-            fileIn.close();
-            total = queue.size();
-            //TODO: Make this threaded so that you don't hold too many 
statements before sending them
-        }
-
-        @Override
-        public boolean nextKeyValue() throws IOException, InterruptedException 
{
-            return queue.size() > 0;
-        }
-
-        @Override
-        public LongWritable getCurrentKey() throws IOException, 
InterruptedException {
-            return new LongWritable(count++);
-        }
-
-        @Override
-        public RyaStatementWritable getCurrentValue() throws IOException, 
InterruptedException {
-            return queue.poll();
-        }
-
-        @Override
-        public float getProgress() throws IOException, InterruptedException {
-            return ((float) (total - queue.size())) / ((float) total);
-        }
-
-        @Override
-        public void close() throws IOException {
-            closed = true;
-        }
-
-        @Override
-        public void startRDF() throws RDFHandlerException {
-        }
-
-        @Override
-        public void endRDF() throws RDFHandlerException {
-        }
-
-        @Override
-        public void handleNamespace(String s, String s1) throws 
RDFHandlerException {
-        }
-
-        @Override
-        public void handleStatement(Statement statement) throws 
RDFHandlerException {
-            queue.add(new 
RyaStatementWritable(RdfToRyaConversions.convertStatement(statement), 
tripleContext));
-        }
-
-        @Override
-        public void handleComment(String s) throws RDFHandlerException {
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputTool.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputTool.java
 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputTool.java
deleted file mode 100644
index 673d65f..0000000
--- 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputTool.java
+++ /dev/null
@@ -1,175 +0,0 @@
-package mvm.rya.accumulo.mr.fileinput;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_CV;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Date;
-import java.util.Map;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.RyaTableMutationsFactory;
-import mvm.rya.accumulo.mr.AbstractAccumuloMRTool;
-import mvm.rya.accumulo.mr.utils.MRUtils;
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.utils.RyaStatementWritable;
-import mvm.rya.api.resolver.RyaTripleContext;
-
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.openrdf.rio.RDFFormat;
-
-/**
- * Do bulk import of rdf files
- * Class RdfFileInputTool
- * Date: May 16, 2011
- * Time: 3:12:16 PM
- */
-public class RdfFileInputTool extends AbstractAccumuloMRTool implements Tool {
-
-    private String format = RDFFormat.RDFXML.getName();
-
-    public static void main(String[] args) {
-        try {
-            ToolRunner.run(new Configuration(), new RdfFileInputTool(), args);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    public long runJob(String[] args) throws IOException, 
ClassNotFoundException, InterruptedException, AccumuloSecurityException {
-        conf.set(MRUtils.JOB_NAME_PROP, "Rdf File Input");
-        //faster
-        init();
-        format = conf.get(MRUtils.FORMAT_PROP, format);
-        conf.set(MRUtils.FORMAT_PROP, format);
-        
-        String inputPath = conf.get(MRUtils.INPUT_PATH, args[0]);
-
-        Job job = new Job(conf);
-        job.setJarByClass(RdfFileInputTool.class);
-
-        // set up cloudbase input
-        job.setInputFormatClass(RdfFileInputFormat.class);
-        RdfFileInputFormat.addInputPath(job, new Path(inputPath));
-
-        // set input output of the particular job
-        job.setMapOutputKeyClass(LongWritable.class);
-        job.setMapOutputValueClass(RyaStatementWritable.class);
-
-        setupOutputFormat(job, tablePrefix + 
RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
-
-        // set mapper and reducer classes
-        job.setMapperClass(StatementToMutationMapper.class);
-        job.setNumReduceTasks(0);
-
-        // Submit the job
-        Date startTime = new Date();
-        System.out.println("Job started: " + startTime);
-        int exitCode = job.waitForCompletion(true) ? 0 : 1;
-
-        if (exitCode == 0) {
-            Date end_time = new Date();
-            System.out.println("Job ended: " + end_time);
-            System.out.println("The job took "
-                    + (end_time.getTime() - startTime.getTime()) / 1000
-                    + " seconds.");
-            return job
-                    .getCounters()
-                    .findCounter("org.apache.hadoop.mapred.Task$Counter",
-                            "REDUCE_OUTPUT_RECORDS").getValue();
-        } else {
-            System.out.println("Job Failed!!!");
-        }
-
-        return -1;
-    }
-
-    @Override
-    public int run(String[] args) throws Exception {
-        runJob(args);
-        return 0;
-    }
-
-    public static class StatementToMutationMapper extends Mapper<LongWritable, 
RyaStatementWritable, Text, Mutation> {
-        protected String tablePrefix;
-        protected Text spo_table;
-        protected Text po_table;
-        protected Text osp_table;
-        private byte[] cv = EMPTY_CV.getExpression();
-        RyaTableMutationsFactory mut;
-
-        public StatementToMutationMapper() {
-        }
-
-        @Override
-        protected void setup(Context context) throws IOException, 
InterruptedException {
-            super.setup(context);
-            Configuration conf = context.getConfiguration();
-            mut = new 
RyaTableMutationsFactory(RyaTripleContext.getInstance(new 
AccumuloRdfConfiguration(conf)));
-            tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, 
RdfCloudTripleStoreConstants.TBL_PRFX_DEF);
-            spo_table = new Text(tablePrefix + 
RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
-            po_table = new Text(tablePrefix + 
RdfCloudTripleStoreConstants.TBL_PO_SUFFIX);
-            osp_table = new Text(tablePrefix + 
RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX);
-
-            final String cv_s = conf.get(MRUtils.AC_CV_PROP);
-            if (cv_s != null)
-                cv = cv_s.getBytes();
-        }
-
-        @Override
-        protected void map(LongWritable key, RyaStatementWritable value, 
Context context) throws IOException, InterruptedException {
-            RyaStatement statement = value.getRyaStatement();
-            if (statement.getColumnVisibility() == null) {
-                statement.setColumnVisibility(cv);
-            }
-            Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, 
Collection<Mutation>> mutationMap =
-                    mut.serialize(statement);
-            Collection<Mutation> spo = 
mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO);
-            Collection<Mutation> po = 
mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO);
-            Collection<Mutation> osp = 
mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP);
-
-            for (Mutation m : spo) {
-                context.write(spo_table, m);
-            }
-            for (Mutation m : po) {
-                context.write(po_table, m);
-            }
-            for (Mutation m : osp) {
-                context.write(osp_table, m);
-            }
-        }
-
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/upgrade/Upgrade322Tool.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/upgrade/Upgrade322Tool.java
 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/upgrade/Upgrade322Tool.java
deleted file mode 100644
index 89f0aa5..0000000
--- 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/upgrade/Upgrade322Tool.java
+++ /dev/null
@@ -1,240 +0,0 @@
-package mvm.rya.accumulo.mr.upgrade;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import mvm.rya.accumulo.mr.AbstractAccumuloMRTool;
-import mvm.rya.accumulo.mr.utils.MRUtils;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.user.RegExFilter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.calrissian.mango.types.LexiTypeEncoders;
-import org.calrissian.mango.types.TypeEncoder;
-
-import java.io.IOException;
-import java.util.Date;
-
-import static mvm.rya.api.RdfCloudTripleStoreConstants.*;
-
-/**
- */
-public class Upgrade322Tool extends AbstractAccumuloMRTool implements Tool {
-    @Override
-    public int run(String[] strings) throws Exception {
-        conf.set(MRUtils.JOB_NAME_PROP, "Upgrade to Rya 3.2.2");
-        //faster
-        init();
-
-        Job job = new Job(conf);
-        job.setJarByClass(Upgrade322Tool.class);
-
-        setupInputFormat(job);
-        AccumuloInputFormat.setInputTableName(job, tablePrefix + 
TBL_OSP_SUFFIX);
-
-        //we do not need to change any row that is a string, custom, or uri 
type
-        IteratorSetting regex = new IteratorSetting(30, "regex",
-                                                    RegExFilter.class);
-        RegExFilter.setRegexs(regex, "\\w*" + TYPE_DELIM + 
"[\u0003|\u0008|\u0002]", null, null, null, false);
-        RegExFilter.setNegate(regex, true);
-
-        // set input output of the particular job
-        job.setMapOutputKeyClass(Text.class);
-        job.setMapOutputValueClass(Mutation.class);
-
-        setupOutputFormat(job, tablePrefix +
-                               TBL_SPO_SUFFIX);
-
-        // set mapper and reducer classes
-        job.setMapperClass(Upgrade322Mapper.class);
-        job.setReducerClass(Reducer.class);
-
-        // Submit the job
-        return job.waitForCompletion(true) ? 0 : 1;
-    }
-
-    public static void main(String[] args) {
-        try {
-            ToolRunner.run(new Configuration(), new Upgrade322Tool(), args);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    /**
-     * Reading from the OSP table
-     */
-    public static class Upgrade322Mapper extends Mapper<Key, Value, Text, 
Mutation> {
-
-        private String tablePrefix;
-        private Text spoTable;
-        private Text poTable;
-        private Text ospTable;
-
-        private final UpgradeObjectSerialization upgradeObjectSerialization;
-
-        public Upgrade322Mapper() {
-            this(new UpgradeObjectSerialization());
-        }
-
-        public Upgrade322Mapper(
-          UpgradeObjectSerialization upgradeObjectSerialization) {
-            this.upgradeObjectSerialization = upgradeObjectSerialization;
-        }
-
-        @Override
-        protected void setup(
-          Context context) throws IOException, InterruptedException {
-            super.setup(context);
-
-            tablePrefix = context.getConfiguration().get(
-              MRUtils.TABLE_PREFIX_PROPERTY, TBL_PRFX_DEF);
-            spoTable = new Text(tablePrefix + TBL_SPO_SUFFIX);
-            poTable = new Text(tablePrefix + TBL_PO_SUFFIX);
-            ospTable = new Text(tablePrefix + TBL_OSP_SUFFIX);
-        }
-
-        @Override
-        protected void map(
-          Key key, Value value, Context context)
-          throws IOException, InterruptedException {
-
-            //read the key, expect OSP
-            final String row = key.getRow().toString();
-            final int firstDelim = row.indexOf(DELIM);
-            final int secondDelim = row.indexOf(DELIM, firstDelim + 1);
-            final int typeDelim = row.lastIndexOf(TYPE_DELIM);
-            final String oldSerialization = row.substring(0, firstDelim);
-            char typeMarker = row.charAt(row.length() - 1);
-
-            final String subject = row.substring(firstDelim + 1, secondDelim);
-            final String predicate = row.substring(secondDelim + 1, typeDelim);
-            final String typeSuffix = TYPE_DELIM + typeMarker;
-
-            String newSerialization = 
upgradeObjectSerialization.upgrade(oldSerialization, typeMarker);
-            if(newSerialization == null) {
-                return;
-            }
-
-            //write out delete Mutations
-            Mutation deleteOldSerialization_osp = new Mutation(key.getRow());
-            deleteOldSerialization_osp.putDelete(key.getColumnFamily(), 
key.getColumnQualifier(),
-                               key.getColumnVisibilityParsed());
-            Mutation deleteOldSerialization_po = new Mutation(predicate + 
DELIM + oldSerialization + DELIM + subject + typeSuffix);
-            deleteOldSerialization_po.putDelete(key.getColumnFamily(),
-                                                key.getColumnQualifier(),
-                                                
key.getColumnVisibilityParsed());
-            Mutation deleteOldSerialization_spo = new Mutation(subject + DELIM 
+ predicate + DELIM + oldSerialization + typeSuffix);
-            deleteOldSerialization_spo.putDelete(key.getColumnFamily(), 
key.getColumnQualifier(),
-                                                
key.getColumnVisibilityParsed());
-
-            //write out new serialization
-            Mutation putNewSerialization_osp = new Mutation(newSerialization + 
DELIM + subject + DELIM + predicate + typeSuffix);
-            putNewSerialization_osp.put(key.getColumnFamily(),
-                                        key.getColumnQualifier(),
-                                        key.getColumnVisibilityParsed(),
-                                        key.getTimestamp(), value);
-            Mutation putNewSerialization_po = new Mutation(predicate + DELIM + 
newSerialization + DELIM + subject + typeSuffix);
-            putNewSerialization_po.put(key.getColumnFamily(),
-                                       key.getColumnQualifier(),
-                                       key.getColumnVisibilityParsed(),
-                                       key.getTimestamp(), value);
-            Mutation putNewSerialization_spo = new Mutation(subject + DELIM + 
predicate + DELIM + newSerialization + typeSuffix);
-            putNewSerialization_spo.put(key.getColumnFamily(),
-                                        key.getColumnQualifier(),
-                                        key.getColumnVisibilityParsed(),
-                                        key.getTimestamp(), value);
-
-            //write out deletes to all tables
-            context.write(ospTable, deleteOldSerialization_osp);
-            context.write(poTable, deleteOldSerialization_po);
-            context.write(spoTable, deleteOldSerialization_spo);
-
-            //write out inserts to all tables
-            context.write(ospTable, putNewSerialization_osp);
-            context.write(poTable, putNewSerialization_po);
-            context.write(spoTable, putNewSerialization_spo);
-        }
-    }
-
-    public static class UpgradeObjectSerialization {
-
-        public static final TypeEncoder<Boolean, String>
-          BOOLEAN_STRING_TYPE_ENCODER = LexiTypeEncoders.booleanEncoder();
-        public static final TypeEncoder<Byte, String> BYTE_STRING_TYPE_ENCODER
-          = LexiTypeEncoders.byteEncoder();
-        public static final TypeEncoder<Date, String> DATE_STRING_TYPE_ENCODER
-          = LexiTypeEncoders.dateEncoder();
-        public static final TypeEncoder<Integer, String>
-          INTEGER_STRING_TYPE_ENCODER = LexiTypeEncoders.integerEncoder();
-        public static final TypeEncoder<Long, String> LONG_STRING_TYPE_ENCODER
-          = LexiTypeEncoders.longEncoder();
-        public static final TypeEncoder<Double, String>
-          DOUBLE_STRING_TYPE_ENCODER = LexiTypeEncoders.doubleEncoder();
-
-        public String upgrade(String object, int typeMarker) {
-            switch(typeMarker) {
-                case 10: //boolean
-                    final boolean bool = Boolean.parseBoolean(object);
-                    return BOOLEAN_STRING_TYPE_ENCODER.encode(bool);
-                case 9: //byte
-                    final byte b = Byte.parseByte(object);
-                    return BYTE_STRING_TYPE_ENCODER.encode(b);
-                case 4: //long
-                    final Long lng = Long.parseLong(object);
-                    return LONG_STRING_TYPE_ENCODER.encode(lng);
-                case 5: //int
-                    final Integer i = Integer.parseInt(object);
-                    return INTEGER_STRING_TYPE_ENCODER.encode(i);
-                case 6: //double
-                    String exp = object.substring(2, 5);
-                    char valueSign = object.charAt(0);
-                    char expSign = object.charAt(1);
-                    Integer expInt = Integer.parseInt(exp);
-                    if (expSign == '-') {
-                        expInt = 999 - expInt;
-                    }
-                    final String expDoubleStr =
-                      String.format("%s%sE%s%d", valueSign,
-                                    object.substring(6),
-                                    expSign, expInt);
-                    return DOUBLE_STRING_TYPE_ENCODER
-                      .encode(Double.parseDouble(expDoubleStr));
-                case 7: //datetime
-                    //check to see if it is an early release that includes the 
exact term xsd:dateTime
-                    final Long l = Long.MAX_VALUE - Long.parseLong(object);
-                    Date date = new Date(l);
-                    return DATE_STRING_TYPE_ENCODER.encode(date);
-                default:
-                    return null;
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloHDFSFileInputFormat.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloHDFSFileInputFormat.java
 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloHDFSFileInputFormat.java
deleted file mode 100644
index c9dac6b..0000000
--- 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloHDFSFileInputFormat.java
+++ /dev/null
@@ -1,206 +0,0 @@
-package mvm.rya.accumulo.mr.utils;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.admin.TableOperations;
-import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.ByteSequence;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.FileSKVIterator;
-import org.apache.accumulo.core.file.rfile.RFileOperations;
-import org.apache.accumulo.core.util.ArgumentChecker;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-
-/**
- * Finds the accumulo tablet files on the hdfs disk, and uses that as the 
input for MR jobs
- * Date: 5/11/12
- * Time: 2:04 PM
- */
-public class AccumuloHDFSFileInputFormat extends FileInputFormat<Key, Value> {
-
-    public static final Range ALLRANGE = new Range(new Text("\u0000"), new 
Text("\uFFFD"));
-
-    @Override
-    public List<InputSplit> getSplits(JobContext jobContext) throws 
IOException {
-        //read the params from AccumuloInputFormat
-        Configuration conf = jobContext.getConfiguration();
-        Instance instance = AccumuloProps.getInstance(jobContext);
-        String user = AccumuloProps.getUsername(jobContext);
-        AuthenticationToken password = AccumuloProps.getPassword(jobContext);
-        String table = AccumuloProps.getTablename(jobContext);
-        ArgumentChecker.notNull(instance);
-        ArgumentChecker.notNull(table);
-
-        //find the files necessary
-        try {
-               AccumuloConfiguration acconf = instance.getConfiguration();
-            FileSystem fs = FileSystem.get(conf);
-            Connector connector = instance.getConnector(user, password);
-            TableOperations tos = connector.tableOperations();
-            String tableId = tos.tableIdMap().get(table);
-            String filePrefix = acconf.get(Property.INSTANCE_DFS_DIR) + 
"/tables/" + tableId;
-            System.out.println(filePrefix);
-
-            Scanner scanner = connector.createScanner("!METADATA", 
Constants.NO_AUTHS); //TODO: auths?
-            scanner.setRange(new Range(new Text(tableId + "\u0000"), new 
Text(tableId + "\uFFFD")));
-            scanner.fetchColumnFamily(new Text("file"));
-            List<String> files = new ArrayList<String>();
-            List<InputSplit> fileSplits = new ArrayList<InputSplit>();
-            Job job = new Job(conf);
-            for (Map.Entry<Key, Value> entry : scanner) {
-                String file = filePrefix + 
entry.getKey().getColumnQualifier().toString();
-                files.add(file);
-                Path path = new Path(file);
-                FileStatus fileStatus = fs.getFileStatus(path);
-                long len = fileStatus.getLen();
-                BlockLocation[] fileBlockLocations = 
fs.getFileBlockLocations(fileStatus, 0, len);
-                fileSplits.add(new FileSplit(path, 0, len, 
fileBlockLocations[0].getHosts()));
-//                FileInputFormat.addInputPath(job, path);
-            }
-            System.out.println(files);
-            return fileSplits;
-//            return super.getSplits(job);
-        } catch (Exception e) {
-            throw new IOException(e);
-        }
-    }
-
-    @Override
-    public RecordReader<Key, Value> createRecordReader(InputSplit inputSplit, 
TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException 
{
-        return new RecordReader<Key, Value>() {
-
-            private FileSKVIterator fileSKVIterator;
-
-            @Override
-            public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext) throws IOException, InterruptedException {
-                FileSplit split = (FileSplit) inputSplit;
-                Configuration job = taskAttemptContext.getConfiguration();
-                Path file = split.getPath();
-//                long start = split.getStart();
-//                long length = split.getLength();
-                FileSystem fs = file.getFileSystem(job);
-//                FSDataInputStream fileIn = fs.open(file);
-//                System.out.println(start);
-//                if (start != 0L) {
-//                    fileIn.seek(start);
-//                }
-                Instance instance = 
AccumuloProps.getInstance(taskAttemptContext);
-
-                fileSKVIterator = 
RFileOperations.getInstance().openReader(file.toString(), ALLRANGE,
-                        new HashSet<ByteSequence>(), false, fs, job, 
instance.getConfiguration());
-//                fileSKVIterator = new RFileOperations2().openReader(fileIn, 
length - start, job);
-            }
-
-            @Override
-            public boolean nextKeyValue() throws IOException, 
InterruptedException {
-                fileSKVIterator.next();
-                return fileSKVIterator.hasTop();
-            }
-
-            @Override
-            public Key getCurrentKey() throws IOException, 
InterruptedException {
-                return fileSKVIterator.getTopKey();
-            }
-
-            @Override
-            public Value getCurrentValue() throws IOException, 
InterruptedException {
-                return fileSKVIterator.getTopValue();
-            }
-
-            @Override
-            public float getProgress() throws IOException, 
InterruptedException {
-                return 0;
-            }
-
-            @Override
-            public void close() throws IOException {
-                //To change body of implemented methods use File | Settings | 
File Templates.
-            }
-        };
-    }
-
-    public static void main(String[] args) {
-        try {
-            Job job = new Job(new Configuration());
-            job.setJarByClass(AccumuloHDFSFileInputFormat.class);
-            Configuration conf = job.getConfiguration();
-            conf.setBoolean("mapred.map.tasks.speculative.execution", false);
-            conf.setBoolean("mapred.reduce.tasks.speculative.execution", 
false);
-            AccumuloInputFormat.setConnectorInfo(job, "root", new 
PasswordToken("secret"));
-            AccumuloInputFormat.setInputTableName(job, "l_spo");
-            AccumuloInputFormat.setScanAuthorizations(job, Constants.NO_AUTHS);
-            AccumuloInputFormat.setZooKeeperInstance(job, "acu13", 
"stratus25:2181");
-            AccumuloInputFormat.setRanges(job, 
Collections.singleton(ALLRANGE));
-            job.setMapperClass(NullMapper.class);
-            job.setNumReduceTasks(0);
-            job.setOutputFormatClass(NullOutputFormat.class);
-            if (args.length == 0) {
-                job.setInputFormatClass(AccumuloHDFSFileInputFormat.class);
-            } else {
-                job.setInputFormatClass(AccumuloInputFormat.class);
-            }
-            job.waitForCompletion(true);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    @SuppressWarnings("rawtypes")
-    public static class NullMapper extends Mapper {
-        @Override
-        protected void map(Object key, Object value, Context context) throws 
IOException, InterruptedException {
-
-        }
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloProps.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloProps.java 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloProps.java
deleted file mode 100644
index 2b89440..0000000
--- 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloProps.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package mvm.rya.accumulo.mr.utils;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import java.io.IOException;
-
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.mapreduce.InputFormatBase;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-@SuppressWarnings("rawtypes")
-public class AccumuloProps extends InputFormatBase {
-
-    @Override
-    public RecordReader createRecordReader(InputSplit inputSplit, 
TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException 
{
-        throw new UnsupportedOperationException("Accumulo Props just holds 
properties");
-    }
-
-    public static Instance getInstance(JobContext  conf) {
-        return InputFormatBase.getInstance(conf);
-    }
-
-    public static AuthenticationToken getPassword(JobContext  conf) {
-        return InputFormatBase.getAuthenticationToken(conf);
-    }
-
-    public static String getUsername(JobContext conf) {
-        return InputFormatBase.getPrincipal(conf);
-    }
-
-    public static String getTablename(JobContext conf) {
-        return InputFormatBase.getInputTableName(conf);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/MRUtils.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/MRUtils.java 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/MRUtils.java
deleted file mode 100644
index c3003d3..0000000
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/MRUtils.java
+++ /dev/null
@@ -1,119 +0,0 @@
-package mvm.rya.accumulo.mr.utils;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import org.apache.hadoop.conf.Configuration;
-import org.openrdf.model.URI;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-
-/**
- * Class MRSailUtils
- * Date: May 19, 2011
- * Time: 10:34:06 AM
- */
-public class MRUtils {
-
-    public static final String JOB_NAME_PROP = "mapred.job.name";
-
-    public static final String AC_USERNAME_PROP = "ac.username";
-    public static final String AC_PWD_PROP = "ac.pwd";
-    public static final String AC_ZK_PROP = "ac.zk";
-    public static final String AC_INSTANCE_PROP = "ac.instance";
-    public static final String AC_TTL_PROP = "ac.ttl";
-    public static final String AC_TABLE_PROP = "ac.table";
-    public static final String AC_AUTH_PROP = "ac.auth";
-    public static final String AC_CV_PROP = "ac.cv";
-    public static final String AC_MOCK_PROP = "ac.mock";
-    public static final String AC_HDFS_INPUT_PROP = "ac.hdfsinput";
-    public static final String HADOOP_IO_SORT_MB = "ac.hdfsinput";
-    public static final String TABLE_LAYOUT_PROP = "rdf.tablelayout";
-    public static final String FORMAT_PROP = "rdf.format";
-    public static final String INPUT_PATH = "input";
-
-    public static final String NAMED_GRAPH_PROP = "rdf.graph";
-
-    public static final String TABLE_PREFIX_PROPERTY = "rdf.tablePrefix";
-
-    // rdf constants
-    public static final ValueFactory vf = new ValueFactoryImpl();
-    public static final URI RDF_TYPE = 
vf.createURI("http://www.w3.org/1999/02/22-rdf-syntax-ns#";, "type");
-
-
-    // cloudbase map reduce utils
-
-//    public static Range retrieveRange(URI entry_key, URI entry_val) throws 
IOException {
-//        ByteArrayDataOutput startRowOut = ByteStreams.newDataOutput();
-//        startRowOut.write(RdfCloudTripleStoreUtils.writeValue(entry_key));
-//        if (entry_val != null) {
-//            startRowOut.write(RdfCloudTripleStoreConstants.DELIM_BYTES);
-//            
startRowOut.write(RdfCloudTripleStoreUtils.writeValue(entry_val));
-//        }
-//        byte[] startrow = startRowOut.toByteArray();
-//        startRowOut.write(RdfCloudTripleStoreConstants.DELIM_STOP_BYTES);
-//        byte[] stoprow = startRowOut.toByteArray();
-//
-//        Range range = new Range(new Text(startrow), new Text(stoprow));
-//        return range;
-//    }
-
-
-    public static String getACTtl(Configuration conf) {
-        return conf.get(AC_TTL_PROP);
-    }
-
-    public static String getACUserName(Configuration conf) {
-        return conf.get(AC_USERNAME_PROP);
-    }
-
-    public static String getACPwd(Configuration conf) {
-        return conf.get(AC_PWD_PROP);
-    }
-
-    public static String getACZK(Configuration conf) {
-        return conf.get(AC_ZK_PROP);
-    }
-
-    public static String getACInstance(Configuration conf) {
-        return conf.get(AC_INSTANCE_PROP);
-    }
-
-    public static void setACUserName(Configuration conf, String str) {
-        conf.set(AC_USERNAME_PROP, str);
-    }
-
-    public static void setACPwd(Configuration conf, String str) {
-        conf.set(AC_PWD_PROP, str);
-    }
-
-    public static void setACZK(Configuration conf, String str) {
-        conf.set(AC_ZK_PROP, str);
-    }
-
-    public static void setACInstance(Configuration conf, String str) {
-        conf.set(AC_INSTANCE_PROP, str);
-    }
-
-    public static void setACTtl(Configuration conf, String str) {
-        conf.set(AC_TTL_PROP, str);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java
 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java
deleted file mode 100644
index 1d0d9c9..0000000
--- 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java
+++ /dev/null
@@ -1,402 +0,0 @@
-package mvm.rya.accumulo.query;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import static mvm.rya.api.RdfCloudTripleStoreUtils.layoutToTable;
-import info.aduna.iteration.CloseableIteration;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.api.RdfCloudTripleStoreConfiguration;
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import mvm.rya.api.RdfCloudTripleStoreUtils;
-import mvm.rya.api.domain.RyaRange;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaType;
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.api.layout.TableLayoutStrategy;
-import mvm.rya.api.persist.RyaDAOException;
-import mvm.rya.api.persist.query.BatchRyaQuery;
-import mvm.rya.api.persist.query.RyaQuery;
-import mvm.rya.api.persist.query.RyaQueryEngine;
-import mvm.rya.api.query.strategy.ByteRange;
-import mvm.rya.api.query.strategy.TriplePatternStrategy;
-import mvm.rya.api.resolver.RyaContext;
-import mvm.rya.api.resolver.RyaTripleContext;
-import mvm.rya.api.resolver.triple.TripleRowRegex;
-import mvm.rya.api.utils.CloseableIterableIteration;
-
-import org.apache.accumulo.core.client.BatchScanner;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.ScannerBase;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.user.RegExFilter;
-import org.apache.accumulo.core.iterators.user.TimestampFilter;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.hadoop.io.Text;
-import org.calrissian.mango.collect.CloseableIterable;
-import org.calrissian.mango.collect.CloseableIterables;
-import org.calrissian.mango.collect.FluentCloseableIterable;
-import org.openrdf.query.BindingSet;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.Iterators;
-
-/**
- * Date: 7/17/12
- * Time: 9:28 AM
- */
-public class AccumuloRyaQueryEngine implements 
RyaQueryEngine<AccumuloRdfConfiguration> {
-
-    private AccumuloRdfConfiguration configuration;
-    private Connector connector;
-    private RyaTripleContext ryaContext;
-    private final Map<TABLE_LAYOUT, KeyValueToRyaStatementFunction> 
keyValueToRyaStatementFunctionMap = new HashMap<TABLE_LAYOUT, 
KeyValueToRyaStatementFunction>();
-
-    public AccumuloRyaQueryEngine(Connector connector) {
-        this(connector, new AccumuloRdfConfiguration());
-    }
-
-    public AccumuloRyaQueryEngine(Connector connector, 
AccumuloRdfConfiguration conf) {
-        this.connector = connector;
-        this.configuration = conf;
-        ryaContext = RyaTripleContext.getInstance(conf);
-        keyValueToRyaStatementFunctionMap.put(TABLE_LAYOUT.SPO, new 
KeyValueToRyaStatementFunction(TABLE_LAYOUT.SPO, ryaContext));
-        keyValueToRyaStatementFunctionMap.put(TABLE_LAYOUT.PO, new 
KeyValueToRyaStatementFunction(TABLE_LAYOUT.PO, ryaContext));
-        keyValueToRyaStatementFunctionMap.put(TABLE_LAYOUT.OSP, new 
KeyValueToRyaStatementFunction(TABLE_LAYOUT.OSP, ryaContext));
-    }
-
-    @Override
-    public CloseableIteration<RyaStatement, RyaDAOException> 
query(RyaStatement stmt, AccumuloRdfConfiguration conf) throws RyaDAOException {
-        if (conf == null) {
-            conf = configuration;
-        }
-
-        RyaQuery ryaQuery = RyaQuery.builder(stmt).load(conf).build();
-        CloseableIterable<RyaStatement> results = query(ryaQuery);
-
-        return new CloseableIterableIteration<RyaStatement, 
RyaDAOException>(results);
-    }
-
-    protected String getData(RyaType ryaType) {
-        return (ryaType != null) ? (ryaType.getData()) : (null);
-    }
-
-    @Override
-    public CloseableIteration<? extends Map.Entry<RyaStatement, BindingSet>, 
RyaDAOException> queryWithBindingSet(Collection<Map.Entry<RyaStatement, 
BindingSet>> stmts, AccumuloRdfConfiguration conf) throws RyaDAOException {
-        if (conf == null) {
-            conf = configuration;
-        }
-        //query configuration
-        Authorizations authorizations = conf.getAuthorizations();
-        Long ttl = conf.getTtl();
-        Long maxResults = conf.getLimit();
-        Integer maxRanges = conf.getMaxRangesForScanner();
-        Integer numThreads = conf.getNumThreads();
-
-        //TODO: cannot span multiple tables here
-        try {
-            Collection<Range> ranges = new HashSet<Range>();
-            RangeBindingSetEntries rangeMap = new RangeBindingSetEntries();
-            TABLE_LAYOUT layout = null;
-            RyaURI context = null;
-            TriplePatternStrategy strategy = null;
-            for (Map.Entry<RyaStatement, BindingSet> stmtbs : stmts) {
-                RyaStatement stmt = stmtbs.getKey();
-                context = stmt.getContext(); //TODO: This will be overwritten
-                BindingSet bs = stmtbs.getValue();
-                strategy = ryaContext.retrieveStrategy(stmt);
-                if (strategy == null) {
-                    throw new IllegalArgumentException("TriplePattern[" + stmt 
+ "] not supported");
-                }
-
-                Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, 
ByteRange> entry =
-                        strategy.defineRange(stmt.getSubject(), 
stmt.getPredicate(), stmt.getObject(), stmt.getContext(), conf);
-
-                //use range to set scanner
-                //populate scanner based on authorizations, ttl
-                layout = entry.getKey();
-                ByteRange byteRange = entry.getValue();
-                Range range = new Range(new Text(byteRange.getStart()), new 
Text(byteRange.getEnd()));
-                ranges.add(range);
-                rangeMap.ranges.add(new 
RdfCloudTripleStoreUtils.CustomEntry<Range, BindingSet>(range, bs));
-            }
-            //no ranges
-            if (layout == null) return null;
-            String regexSubject = conf.getRegexSubject();
-            String regexPredicate = conf.getRegexPredicate();
-            String regexObject = conf.getRegexObject();
-            TripleRowRegex tripleRowRegex = strategy.buildRegex(regexSubject, 
regexPredicate, regexObject, null, null);
-
-            String table = layoutToTable(layout, conf);
-            boolean useBatchScanner = ranges.size() > maxRanges;
-            RyaStatementBindingSetKeyValueIterator iterator = null;
-            if (useBatchScanner) {
-                ScannerBase scanner = connector.createBatchScanner(table, 
authorizations, numThreads);
-                ((BatchScanner) scanner).setRanges(ranges);
-                fillScanner(scanner, context, null, ttl, null, tripleRowRegex, 
conf);
-                iterator = new RyaStatementBindingSetKeyValueIterator(layout, 
ryaContext, scanner, rangeMap);
-            } else {
-                Scanner scannerBase = null;
-                Iterator<Map.Entry<Key, Value>>[] iters = new 
Iterator[ranges.size()];
-                int i = 0;
-                for (Range range : ranges) {
-                    scannerBase = connector.createScanner(table, 
authorizations);
-                    scannerBase.setRange(range);
-                    fillScanner(scannerBase, context, null, ttl, null, 
tripleRowRegex, conf);
-                    iters[i] = scannerBase.iterator();
-                    i++;
-                }
-                iterator = new RyaStatementBindingSetKeyValueIterator(layout, 
Iterators.concat(iters), rangeMap, ryaContext);
-            }
-            if (maxResults != null) {
-                iterator.setMaxResults(maxResults);
-            }
-            return iterator;
-        } catch (Exception e) {
-            throw new RyaDAOException(e);
-        }
-
-    }
-
-    @Override
-    public CloseableIteration<RyaStatement, RyaDAOException> 
batchQuery(Collection<RyaStatement> stmts, AccumuloRdfConfiguration conf)
-            throws RyaDAOException {
-        if (conf == null) {
-            conf = configuration;
-        }
-
-        BatchRyaQuery batchRyaQuery = 
BatchRyaQuery.builder(stmts).load(conf).build();
-        CloseableIterable<RyaStatement> results = query(batchRyaQuery);
-
-        return new CloseableIterableIteration<RyaStatement, 
RyaDAOException>(results);
-    }
-
-    @Override
-    public CloseableIterable<RyaStatement> query(RyaQuery ryaQuery) throws 
RyaDAOException {
-        Preconditions.checkNotNull(ryaQuery);
-        RyaStatement stmt = ryaQuery.getQuery();
-        Preconditions.checkNotNull(stmt);
-
-        //query configuration
-        String[] auths = ryaQuery.getAuths();
-        Authorizations authorizations = auths != null ? new 
Authorizations(auths) : configuration.getAuthorizations();
-        Long ttl = ryaQuery.getTtl();
-        Long currentTime = ryaQuery.getCurrentTime();
-        Long maxResults = ryaQuery.getMaxResults();
-        Integer batchSize = ryaQuery.getBatchSize();
-        String regexSubject = ryaQuery.getRegexSubject();
-        String regexPredicate = ryaQuery.getRegexPredicate();
-        String regexObject = ryaQuery.getRegexObject();
-        TableLayoutStrategy tableLayoutStrategy = 
configuration.getTableLayoutStrategy();
-
-        try {
-            //find triple pattern range
-            TriplePatternStrategy strategy = ryaContext.retrieveStrategy(stmt);
-            TABLE_LAYOUT layout;
-            Range range;
-            RyaURI subject = stmt.getSubject();
-            RyaURI predicate = stmt.getPredicate();
-            RyaType object = stmt.getObject();
-            RyaURI context = stmt.getContext();
-            String qualifier = stmt.getQualifer();
-            TripleRowRegex tripleRowRegex = null;
-            if (strategy != null) {
-                //otherwise, full table scan is supported
-                Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, 
ByteRange> entry =
-                        strategy.defineRange(subject, predicate, object, 
context, null);
-                layout = entry.getKey();
-                ByteRange byteRange = entry.getValue();
-                range = new Range(new Text(byteRange.getStart()), new 
Text(byteRange.getEnd()));
-
-                byte[] objectTypeInfo = null;
-                if (object != null) {
-                    //TODO: Not good to serialize this twice
-                    if (object instanceof RyaRange) {
-                        objectTypeInfo = 
RyaContext.getInstance().serializeType(((RyaRange) object).getStart())[1];
-                    } else {
-                        objectTypeInfo = 
RyaContext.getInstance().serializeType(object)[1];
-                    }
-                }
-
-                tripleRowRegex = strategy.buildRegex(regexSubject, 
regexPredicate, regexObject, null, objectTypeInfo);
-            } else {
-                range = new Range();
-                layout = TABLE_LAYOUT.SPO;
-            }
-
-            //use range to set scanner
-            //populate scanner based on authorizations, ttl
-            String table = layoutToTable(layout, tableLayoutStrategy);
-            Scanner scanner = connector.createScanner(table, authorizations);
-            scanner.setRange(range);
-            if (batchSize != null) {
-                scanner.setBatchSize(batchSize);
-            }
-            fillScanner(scanner, context, qualifier, ttl, currentTime, 
tripleRowRegex, ryaQuery.getConf());
-
-            FluentCloseableIterable<RyaStatement> results = 
FluentCloseableIterable.from(new ScannerBaseCloseableIterable(scanner))
-                    .transform(keyValueToRyaStatementFunctionMap.get(layout));
-            if (maxResults != null) {
-                results = results.limit(maxResults.intValue());
-            }
-
-            return results;
-        } catch (Exception e) {
-            throw new RyaDAOException(e);
-        }
-    }
-
-    @Override
-    public CloseableIterable<RyaStatement> query(BatchRyaQuery ryaQuery) 
throws RyaDAOException {
-        Preconditions.checkNotNull(ryaQuery);
-        Iterable<RyaStatement> stmts = ryaQuery.getQueries();
-        Preconditions.checkNotNull(stmts);
-
-        //query configuration
-        String[] auths = ryaQuery.getAuths();
-        final Authorizations authorizations = auths != null ? new 
Authorizations(auths) : configuration.getAuthorizations();
-        final Long ttl = ryaQuery.getTtl();
-        Long currentTime = ryaQuery.getCurrentTime();
-        Long maxResults = ryaQuery.getMaxResults();
-        Integer batchSize = ryaQuery.getBatchSize();
-        Integer numQueryThreads = ryaQuery.getNumQueryThreads();
-        String regexSubject = ryaQuery.getRegexSubject();
-        String regexPredicate = ryaQuery.getRegexPredicate();
-        String regexObject = ryaQuery.getRegexObject();
-        TableLayoutStrategy tableLayoutStrategy = 
configuration.getTableLayoutStrategy();
-        int maxRanges = ryaQuery.getMaxRanges();
-
-        //TODO: cannot span multiple tables here
-        try {
-            Collection<Range> ranges = new HashSet<Range>();
-            TABLE_LAYOUT layout = null;
-            RyaURI context = null;
-            TriplePatternStrategy strategy = null;
-            for (RyaStatement stmt : stmts) {
-                context = stmt.getContext(); //TODO: This will be overwritten
-                strategy = ryaContext.retrieveStrategy(stmt);
-                if (strategy == null) {
-                    throw new IllegalArgumentException("TriplePattern[" + stmt 
+ "] not supported");
-                }
-
-                Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, 
ByteRange> entry =
-                        strategy.defineRange(stmt.getSubject(), 
stmt.getPredicate(), stmt.getObject(), stmt.getContext(), null);
-
-                //use range to set scanner
-                //populate scanner based on authorizations, ttl
-                layout = entry.getKey();
-                ByteRange byteRange = entry.getValue();
-                Range range = new Range(new Text(byteRange.getStart()), new 
Text(byteRange.getEnd()));
-                ranges.add(range);
-            }
-            //no ranges
-            if (layout == null) throw new IllegalArgumentException("No table 
layout specified");
-
-            final TripleRowRegex tripleRowRegex = 
strategy.buildRegex(regexSubject, regexPredicate, regexObject, null, null);
-
-            final String table = layoutToTable(layout, tableLayoutStrategy);
-            boolean useBatchScanner = ranges.size() > maxRanges;
-            FluentCloseableIterable<RyaStatement> results = null;
-            if (useBatchScanner) {
-                BatchScanner scanner = connector.createBatchScanner(table, 
authorizations, numQueryThreads);
-                scanner.setRanges(ranges);
-                fillScanner(scanner, context, null, ttl, null, tripleRowRegex, 
ryaQuery.getConf());
-                results = FluentCloseableIterable.from(new 
ScannerBaseCloseableIterable(scanner)).transform(keyValueToRyaStatementFunctionMap.get(layout));
-            } else {
-                final RyaURI fcontext = context;
-                final RdfCloudTripleStoreConfiguration fconf = 
ryaQuery.getConf();
-                FluentIterable<RyaStatement> fluent = 
FluentIterable.from(ranges).transformAndConcat(new Function<Range, 
Iterable<Map.Entry<Key, Value>>>() {
-                    @Override
-                    public Iterable<Map.Entry<Key, Value>> apply(Range range) {
-                        try {
-                            Scanner scanner = connector.createScanner(table, 
authorizations);
-                            scanner.setRange(range);
-                            fillScanner(scanner, fcontext, null, ttl, null, 
tripleRowRegex, fconf);
-                            return scanner;
-                        } catch (Exception e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-                }).transform(keyValueToRyaStatementFunctionMap.get(layout));
-
-                results = 
FluentCloseableIterable.from(CloseableIterables.wrap(fluent));
-            }
-            if (maxResults != null) {
-                results = results.limit(maxResults.intValue());
-            }
-            return results;
-        } catch (Exception e) {
-            throw new RyaDAOException(e);
-        }
-    }
-
-    protected void fillScanner(ScannerBase scanner, RyaURI context, String 
qualifier, Long ttl, Long currentTime, TripleRowRegex tripleRowRegex, 
RdfCloudTripleStoreConfiguration conf) throws IOException {
-        if (context != null && qualifier != null) {
-            scanner.fetchColumn(new Text(context.getData()), new 
Text(qualifier));
-        } else if (context != null) {
-            scanner.fetchColumnFamily(new Text(context.getData()));
-        } else if (qualifier != null) {
-            IteratorSetting setting = new IteratorSetting(8, "riq", 
RegExFilter.class.getName());
-            RegExFilter.setRegexs(setting, null, null, qualifier, null, false);
-            scanner.addScanIterator(setting);
-        }
-        if (ttl != null) {
-            IteratorSetting setting = new IteratorSetting(9, "fi", 
TimestampFilter.class.getName());
-            TimestampFilter.setStart(setting,  System.currentTimeMillis() - 
ttl, true);
-            if(currentTime != null){
-                TimestampFilter.setStart(setting, currentTime - ttl, true);
-                TimestampFilter.setEnd(setting, currentTime, true);
-            }
-            scanner.addScanIterator(setting);
-        }
-        if (tripleRowRegex != null) {
-            IteratorSetting setting = new IteratorSetting(11, "ri", 
RegExFilter.class.getName());
-            String regex = tripleRowRegex.getRow();
-            RegExFilter.setRegexs(setting, regex, null, null, null, false);
-            scanner.addScanIterator(setting);
-        }
-    }
-
-    @Override
-    public void setConf(AccumuloRdfConfiguration conf) {
-        this.configuration = conf;
-    }
-
-    @Override
-    public AccumuloRdfConfiguration getConf() {
-        return configuration;
-    }
-}


Reply via email to