http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConstants.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConstants.java 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConstants.java
new file mode 100644
index 0000000..aa5157b
--- /dev/null
+++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConstants.java
@@ -0,0 +1,39 @@
+package mvm.rya.accumulo;
+
+/*
+ * #%L
+ * mvm.rya.accumulo.rya
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+
+/**
+ * Interface AccumuloRdfConstants
+ * Date: Mar 1, 2012
+ * Time: 7:24:52 PM
+ */
+public interface AccumuloRdfConstants {
+    public static final Authorizations ALL_AUTHORIZATIONS = Constants.NO_AUTHS;
+
+    public static final Value EMPTY_VALUE = new Value(new byte[0]);
+
+    public static final ColumnVisibility EMPTY_CV = new ColumnVisibility(new 
byte[0]);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfEvalStatsDAO.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfEvalStatsDAO.java 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfEvalStatsDAO.java
new file mode 100644
index 0000000..65fad20
--- /dev/null
+++ 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfEvalStatsDAO.java
@@ -0,0 +1,172 @@
+package mvm.rya.accumulo;
+
+/*
+ * #%L
+ * mvm.rya.accumulo.rya
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.PRED_CF_TXT;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.SUBJECT_CF_TXT;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.SUBJECTPRED_CF_TXT;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.PREDOBJECT_CF_TXT;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.SUBJECTOBJECT_CF_TXT;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import mvm.rya.api.RdfCloudTripleStoreStatement;
+import mvm.rya.api.layout.TableLayoutStrategy;
+import mvm.rya.api.persist.RdfDAOException;
+import mvm.rya.api.persist.RdfEvalStatsDAO;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+import org.openrdf.model.Resource;
+import org.openrdf.model.Value;
+
+/**
+ * Class CloudbaseRdfEvalStatsDAO
+ * Date: Feb 28, 2012
+ * Time: 5:03:16 PM
+ */
+public class AccumuloRdfEvalStatsDAO implements 
RdfEvalStatsDAO<AccumuloRdfConfiguration> {
+
+    private boolean initialized = false;
+    private AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+
+    private Collection<RdfCloudTripleStoreStatement> statements = new 
ArrayList<RdfCloudTripleStoreStatement>();
+    private Connector connector;
+
+    //    private String evalTable = TBL_EVAL;
+    private TableLayoutStrategy tableLayoutStrategy;
+
+    @Override
+    public void init() throws RdfDAOException {
+        try {
+            if (isInitialized()) {
+                throw new IllegalStateException("Already initialized");
+            }
+            checkNotNull(connector);
+            tableLayoutStrategy = conf.getTableLayoutStrategy();
+//            evalTable = 
conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_EVAL, evalTable);
+//            conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_EVAL, 
evalTable);
+
+            TableOperations tos = connector.tableOperations();
+            AccumuloRdfUtils.createTableIfNotExist(tos, 
tableLayoutStrategy.getEval());
+//            boolean tableExists = tos.exists(evalTable);
+//            if (!tableExists)
+//                tos.create(evalTable);
+            initialized = true;
+        } catch (Exception e) {
+            throw new RdfDAOException(e);
+        }
+    }
+
+ 
+    @Override
+    public void destroy() throws RdfDAOException {
+        if (!isInitialized()) {
+            throw new IllegalStateException("Not initialized");
+        }
+        initialized = false;
+    }
+
+    @Override
+    public boolean isInitialized() throws RdfDAOException {
+        return initialized;
+    }
+
+    public Connector getConnector() {
+        return connector;
+    }
+
+    public void setConnector(Connector connector) {
+        this.connector = connector;
+    }
+
+    public AccumuloRdfConfiguration getConf() {
+        return conf;
+    }
+
+    public void setConf(AccumuloRdfConfiguration conf) {
+        this.conf = conf;
+    }
+
+       @Override
+       public double getCardinality(AccumuloRdfConfiguration conf,
+                       mvm.rya.api.persist.RdfEvalStatsDAO.CARDINALITY_OF card,
+                       List<Value> val, Resource context) throws 
RdfDAOException {
+        try {
+            Authorizations authorizations = conf.getAuthorizations();
+            Scanner scanner = 
connector.createScanner(tableLayoutStrategy.getEval(), authorizations);
+            Text cfTxt = null;
+            if (CARDINALITY_OF.SUBJECT.equals(card)) {
+                cfTxt = SUBJECT_CF_TXT;
+            } else if (CARDINALITY_OF.PREDICATE.equals(card)) {
+                cfTxt = PRED_CF_TXT;
+            } else if (CARDINALITY_OF.OBJECT.equals(card)) {
+//                cfTxt = OBJ_CF_TXT;     //TODO: How do we do object 
cardinality
+                return Double.MAX_VALUE;
+            } else if (CARDINALITY_OF.SUBJECTOBJECT.equals(card)) {
+                cfTxt = SUBJECTOBJECT_CF_TXT;
+            } else if (CARDINALITY_OF.SUBJECTPREDICATE.equals(card)) {
+                cfTxt = SUBJECTPRED_CF_TXT;
+            } else if (CARDINALITY_OF.PREDICATEOBJECT.equals(card)) {
+                cfTxt = PREDOBJECT_CF_TXT;
+            } else throw new IllegalArgumentException("Not right Cardinality[" 
+ card + "]");
+            Text cq = EMPTY_TEXT;
+            if (context != null) {
+                cq = new Text(context.stringValue().getBytes());
+            }
+            scanner.fetchColumn(cfTxt, cq);
+            Iterator<Value> vals = val.iterator();
+            String compositeIndex = vals.next().stringValue();
+            while (vals.hasNext()){
+               compositeIndex += DELIM + vals.next().stringValue();
+            }
+            scanner.setRange(new Range(new Text(compositeIndex.getBytes())));
+            Iterator<Map.Entry<Key, org.apache.accumulo.core.data.Value>> iter 
= scanner.iterator();
+            if (iter.hasNext()) {
+                return Double.parseDouble(new 
String(iter.next().getValue().get()));
+            }
+        } catch (Exception e) {
+            throw new RdfDAOException(e);
+        }
+
+        //default
+        return -1;
+       }
+
+       @Override
+       public double getCardinality(AccumuloRdfConfiguration conf,
+                       mvm.rya.api.persist.RdfEvalStatsDAO.CARDINALITY_OF card,
+                       List<Value> val) throws RdfDAOException {
+               return getCardinality(conf, card, val, null);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfQueryIterator.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfQueryIterator.java 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfQueryIterator.java
new file mode 100644
index 0000000..13ae37f
--- /dev/null
+++ 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfQueryIterator.java
@@ -0,0 +1,297 @@
+//package mvm.rya.accumulo;
+
+/*
+ * #%L
+ * mvm.rya.accumulo.rya
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+//
+//import com.google.common.collect.Iterators;
+//import com.google.common.io.ByteArrayDataInput;
+//import com.google.common.io.ByteStreams;
+//import info.aduna.iteration.CloseableIteration;
+//import mvm.rya.api.RdfCloudTripleStoreConstants;
+//import mvm.rya.api.RdfCloudTripleStoreUtils;
+//import mvm.rya.api.persist.RdfDAOException;
+//import mvm.rya.api.utils.NullableStatementImpl;
+//import org.apache.accumulo.core.client.*;
+//import org.apache.accumulo.core.data.Key;
+//import org.apache.accumulo.core.data.Range;
+//import org.apache.accumulo.core.iterators.user.AgeOffFilter;
+//import org.apache.accumulo.core.iterators.user.TimestampFilter;
+//import org.apache.accumulo.core.security.Authorizations;
+//import org.apache.hadoop.io.Text;
+//import org.openrdf.model.Resource;
+//import org.openrdf.model.Statement;
+//import org.openrdf.model.URI;
+//import org.openrdf.model.Value;
+//import org.openrdf.query.BindingSet;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//import java.io.IOException;
+//import java.util.Collection;
+//import java.util.Collections;
+//import java.util.HashSet;
+//import java.util.Iterator;
+//import java.util.Map.Entry;
+//
+//import static mvm.rya.accumulo.AccumuloRdfConstants.ALL_AUTHORIZATIONS;
+//import static mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+//import static mvm.rya.api.RdfCloudTripleStoreUtils.writeValue;
+//
+//public class AccumuloRdfQueryIterator implements
+//        CloseableIteration<Entry<Statement, BindingSet>, RdfDAOException> {
+//
+//    protected final Logger logger = LoggerFactory.getLogger(getClass());
+//
+//    private boolean open = false;
+//    private Iterator result;
+//    private Resource[] contexts;
+//    private Collection<Entry<Statement, BindingSet>> statements;
+//    private int numOfThreads = 20;
+//
+//    private RangeBindingSetEntries rangeMap = new RangeBindingSetEntries();
+//    private ScannerBase scanner;
+//    private boolean isBatchScanner = true;
+//    private Statement statement;
+//    Iterator<BindingSet> iter_bss = null;
+//
+//    private boolean hasNext = true;
+//    private AccumuloRdfConfiguration conf;
+//    private TABLE_LAYOUT tableLayout;
+//    private Text context_txt;
+//
+//    private DefineTripleQueryRangeFactory queryRangeFactory = new 
DefineTripleQueryRangeFactory();
+//
+//    public AccumuloRdfQueryIterator(Collection<Entry<Statement, BindingSet>> 
statements, Connector connector, Resource... contexts)
+//            throws RdfDAOException {
+//        this(statements, connector, null, contexts);
+//    }
+//
+//    public AccumuloRdfQueryIterator(Collection<Entry<Statement, BindingSet>> 
statements, Connector connector,
+//                                    AccumuloRdfConfiguration conf, 
Resource... contexts)
+//            throws RdfDAOException {
+//        this.statements = statements;
+//        this.contexts = contexts;
+//        this.conf = conf;
+//        initialize(connector);
+//        open = true;
+//    }
+//
+//    public AccumuloRdfQueryIterator(Resource subject, URI predicate, Value 
object, Connector connector,
+//                                    AccumuloRdfConfiguration conf, 
Resource[] contexts) throws RdfDAOException {
+//        this(Collections.<Entry<Statement, BindingSet>>singleton(new 
RdfCloudTripleStoreUtils.CustomEntry<Statement, BindingSet>(
+//                new NullableStatementImpl(subject, predicate, object, 
contexts),
+//                null)), connector, conf, contexts);
+//    }
+//
+//    protected void initialize(Connector connector)
+//            throws RdfDAOException {
+//        try {
+//            //TODO: We cannot span multiple tables here
+//            Collection<Range> ranges = new HashSet<Range>();
+//
+//            result = Iterators.emptyIterator();
+//            Long startTime = conf.getStartTime();
+//            Long ttl = conf.getTtl();
+//
+//            Resource context = null;
+//            for (Entry<Statement, BindingSet> stmtbs : statements) {
+//                Statement stmt = stmtbs.getKey();
+//                Resource subject = stmt.getSubject();
+//                URI predicate = stmt.getPredicate();
+//                Value object = stmt.getObject();
+//                context = stmt.getContext(); //TODO: assumes the same 
context for all statements
+//                logger.debug("Batch Scan, lookup subject[" + subject + "] 
predicate[" + predicate + "] object[" + object + "] combination");
+//
+//                Entry<TABLE_LAYOUT, Range> entry = 
queryRangeFactory.defineRange(subject, predicate, object, conf);
+//                tableLayout = entry.getKey();
+////                isTimeRange = isTimeRange || 
queryRangeFactory.isTimeRange();
+//                Range range = entry.getValue();
+//                ranges.add(range);
+//                rangeMap.ranges.add(new 
RdfCloudTripleStoreUtils.CustomEntry<Range, BindingSet>(range, 
stmtbs.getValue()));
+//            }
+//
+//            Authorizations authorizations = 
AccumuloRdfConstants.ALL_AUTHORIZATIONS;
+//            String auth = conf.getAuth();
+//            if (auth != null) {
+//                authorizations = new Authorizations(auth.split(","));
+//            }
+//            String table = 
RdfCloudTripleStoreUtils.layoutToTable(tableLayout, conf);
+//            result = createScanner(connector, authorizations, table, 
context, startTime, ttl, ranges);
+////            if (isBatchScanner) {
+////                ((BatchScanner) scanner).setRanges(ranges);
+////            } else {
+////                for (Range range : ranges) {
+////                    ((Scanner) scanner).setRange(range); //TODO: Not good 
way of doing this
+////                }
+////            }
+////
+////            if (isBatchScanner) {
+////                result = ((BatchScanner) scanner).iterator();
+////            } else {
+////                result = ((Scanner) scanner).iterator();
+////            }
+//        } catch (Exception e) {
+//            throw new RdfDAOException(e);
+//        }
+//    }
+//
+//    protected Iterator<Entry<Key, org.apache.accumulo.core.data.Value>> 
createScanner(Connector connector, Authorizations authorizations, String table, 
Resource context, Long startTime, Long ttl, Collection<Range> ranges) throws 
TableNotFoundException, IOException {
+////        ShardedConnector shardedConnector = new 
ShardedConnector(connector, 4, ta)
+//        if (rangeMap.ranges.size() > (numOfThreads / 2)) { //TODO: Arbitrary 
number, make configurable
+//            BatchScanner scannerBase = connector.createBatchScanner(table, 
authorizations, numOfThreads);
+//            scannerBase.setRanges(ranges);
+//            populateScanner(context, startTime, ttl, scannerBase);
+//            return scannerBase.iterator();
+//        } else {
+//            isBatchScanner = false;
+//            Iterator<Entry<Key, org.apache.accumulo.core.data.Value>>[] 
iters = new Iterator[ranges.size()];
+//            int i = 0;
+//            for (Range range : ranges) {
+//                Scanner scannerBase = connector.createScanner(table, 
authorizations);
+//                populateScanner(context, startTime, ttl, scannerBase);
+//                scannerBase.setRange(range);
+//                iters[i] = scannerBase.iterator();
+//                i++;
+//                scanner = scannerBase; //TODO: Always overridden, but 
doesn't matter since Scanner doesn't need to be closed
+//            }
+//            return Iterators.concat(iters);
+//        }
+//
+//    }
+//
+//    protected void populateScanner(Resource context, Long startTime, Long 
ttl, ScannerBase scannerBase) throws IOException {
+//        if (context != null) { //default graph
+//            context_txt = new Text(writeValue(context));
+//            scannerBase.fetchColumnFamily(context_txt);
+//        }
+//
+////        if (!isQueryTimeBased(conf)) {
+//        if (startTime != null && ttl != null) {
+////            scannerBase.setScanIterators(1, 
FilteringIterator.class.getName(), "filteringIterator");
+////            scannerBase.setScanIteratorOption("filteringIterator", "0", 
TimeRangeFilter.class.getName());
+////            scannerBase.setScanIteratorOption("filteringIterator", "0." + 
TimeRangeFilter.TIME_RANGE_PROP, ttl);
+////            scannerBase.setScanIteratorOption("filteringIterator", "0." + 
TimeRangeFilter.START_TIME_PROP, startTime);
+//            IteratorSetting setting = new IteratorSetting(1, "fi", 
TimestampFilter.class.getName());
+//            TimestampFilter.setStart(setting, startTime, true);
+//            TimestampFilter.setEnd(setting, startTime + ttl, true);
+//            scannerBase.addScanIterator(setting);
+//        } else if (ttl != null) {
+////                scannerBase.setScanIterators(1, 
FilteringIterator.class.getName(), "filteringIterator");
+////                scannerBase.setScanIteratorOption("filteringIterator", 
"0", AgeOffFilter.class.getName());
+////                scannerBase.setScanIteratorOption("filteringIterator", 
"0.ttl", ttl);
+//            IteratorSetting setting = new IteratorSetting(1, "fi", 
AgeOffFilter.class.getName());
+//            AgeOffFilter.setTTL(setting, ttl);
+//            scannerBase.addScanIterator(setting);
+//        }
+////        }
+//    }
+//
+//    @Override
+//    public void close() throws RdfDAOException {
+//        if (!open)
+//            return;
+//        verifyIsOpen();
+//        open = false;
+//        if (scanner != null && isBatchScanner) {
+//            ((BatchScanner) scanner).close();
+//        }
+//    }
+//
+//    public void verifyIsOpen() throws RdfDAOException {
+//        if (!open) {
+//            throw new RdfDAOException("Iterator not open");
+//        }
+//    }
+//
+//    @Override
+//    public boolean hasNext() throws RdfDAOException {
+//        try {
+//            if (!open)
+//                return false;
+//            verifyIsOpen();
+//            /**
+//             * For some reason, the result.hasNext returns false
+//             * once at the end of the iterator, and then true
+//             * for every subsequent call.
+//             */
+//            hasNext = (hasNext && result.hasNext());
+//            return hasNext || ((iter_bss != null) && iter_bss.hasNext());
+//        } catch (Exception e) {
+//            throw new RdfDAOException(e);
+//        }
+//    }
+//
+//    @Override
+//    public Entry<Statement, BindingSet> next() throws RdfDAOException {
+//        try {
+//            if (!this.hasNext())
+//                return null;
+//
+//            return getStatement(result, contexts);
+//        } catch (Exception e) {
+//            throw new RdfDAOException(e);
+//        }
+//    }
+//
+//    public Entry<Statement, BindingSet> getStatement(
+//            Iterator<Entry<Key, org.apache.accumulo.core.data.Value>> 
rowResults,
+//            Resource... filterContexts) throws IOException {
+//        try {
+//            while (true) {
+//                if (iter_bss != null && iter_bss.hasNext()) {
+//                    return new 
RdfCloudTripleStoreUtils.CustomEntry<Statement, BindingSet>(statement, 
iter_bss.next());
+//                }
+//
+//                if (rowResults.hasNext()) {
+//                    Entry<Key, org.apache.accumulo.core.data.Value> entry = 
rowResults.next();
+//                    Key key = entry.getKey();
+//                    ByteArrayDataInput input = 
ByteStreams.newDataInput(key.getRow().getBytes());
+//                    statement = 
RdfCloudTripleStoreUtils.translateStatementFromRow(input, 
key.getColumnFamily(), tableLayout, RdfCloudTripleStoreConstants.VALUE_FACTORY);
+//                    iter_bss = rangeMap.containsKey(key).iterator();
+//                } else
+//                    break;
+//            }
+//        } catch (Exception e) {
+//            throw new IOException(e);
+//        }
+//        return null;
+//    }
+//
+//    @Override
+//    public void remove() throws RdfDAOException {
+//        next();
+//    }
+//
+//    public int getNumOfThreads() {
+//        return numOfThreads;
+//    }
+//
+//    public void setNumOfThreads(int numOfThreads) {
+//        this.numOfThreads = numOfThreads;
+//    }
+//
+//    public DefineTripleQueryRangeFactory getQueryRangeFactory() {
+//        return queryRangeFactory;
+//    }
+//
+//    public void setQueryRangeFactory(DefineTripleQueryRangeFactory 
queryRangeFactory) {
+//        this.queryRangeFactory = queryRangeFactory;
+//    }
+//}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfUtils.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfUtils.java 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfUtils.java
new file mode 100644
index 0000000..d3b651f
--- /dev/null
+++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfUtils.java
@@ -0,0 +1,71 @@
+package mvm.rya.accumulo;
+
+/*
+ * #%L
+ * mvm.rya.accumulo.rya
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.resolver.triple.TripleRow;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_BYTES;
+
+/**
+ * Class AccumuloRdfUtils
+ * Date: Mar 1, 2012
+ * Time: 7:15:54 PM
+ */
+public class AccumuloRdfUtils {
+    private static final Log logger = 
LogFactory.getLog(AccumuloRdfUtils.class);
+
+    public static void createTableIfNotExist(TableOperations tableOperations, 
String tableName) throws AccumuloException, AccumuloSecurityException, 
TableExistsException {
+        boolean tableExists = tableOperations.exists(tableName);
+        if (!tableExists) {
+            logger.debug("Creating accumulo table: " + tableName);
+            tableOperations.create(tableName);
+        }
+    }
+
+    public static Key from(TripleRow tripleRow) {
+        return new Key(defaultTo(tripleRow.getRow(), EMPTY_BYTES),
+                defaultTo(tripleRow.getColumnFamily(), EMPTY_BYTES),
+                defaultTo(tripleRow.getColumnQualifier(), EMPTY_BYTES),
+                defaultTo(tripleRow.getColumnVisibility(), EMPTY_BYTES),
+                defaultTo(tripleRow.getTimestamp(), Long.MAX_VALUE));
+    }
+
+    public static Value extractValue(TripleRow tripleRow) {
+        return new Value(defaultTo(tripleRow.getValue(), EMPTY_BYTES));
+    }
+
+    private static byte[] defaultTo(byte[] bytes, byte[] def) {
+        return bytes != null ? bytes : def;
+    }
+
+    private static Long defaultTo(Long l, Long def) {
+        return l != null ? l : def;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java
new file mode 100644
index 0000000..50744dd
--- /dev/null
+++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java
@@ -0,0 +1,522 @@
+package mvm.rya.accumulo;
+
+/*
+ * #%L
+ * mvm.rya.accumulo.rya
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static mvm.rya.accumulo.AccumuloRdfConstants.ALL_AUTHORIZATIONS;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.INFO_NAMESPACE_TXT;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.MAX_MEMORY;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.MAX_TIME;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.NUM_THREADS;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.RTS_SUBJECT_RYA;
+import static 
mvm.rya.api.RdfCloudTripleStoreConstants.RTS_VERSION_PREDICATE_RYA;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.VERSION_RYA;
+import info.aduna.iteration.CloseableIteration;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer;
+import mvm.rya.accumulo.experimental.AccumuloIndexer;
+import mvm.rya.accumulo.query.AccumuloRyaQueryEngine;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.layout.TableLayoutStrategy;
+import mvm.rya.api.persist.RyaDAO;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.api.persist.RyaNamespaceManager;
+import mvm.rya.api.resolver.RyaTripleContext;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchDeleter;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.openrdf.model.Namespace;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+
+/**
+ * Class AccumuloRyaDAO
+ * Date: Feb 29, 2012
+ * Time: 12:37:22 PM
+ */
+public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, 
RyaNamespaceManager<AccumuloRdfConfiguration> {
+    private static final Log logger = LogFactory.getLog(AccumuloRyaDAO.class);
+
+    private boolean initialized = false;
+    private Connector connector;
+    private BatchWriterConfig batchWriterConfig;
+
+    private MultiTableBatchWriter mt_bw;
+
+    // Do not flush these individually
+    private BatchWriter bw_spo;
+    private BatchWriter bw_po;
+    private BatchWriter bw_osp;
+
+    private BatchWriter bw_ns;
+
+    private List<AccumuloIndexer> secondaryIndexers;
+    
+    private AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+    private RyaTableMutationsFactory ryaTableMutationsFactory;
+    private TableLayoutStrategy tableLayoutStrategy;
+    private AccumuloRyaQueryEngine queryEngine;
+    private RyaTripleContext ryaContext;
+
+    @Override
+    public boolean isInitialized() throws RyaDAOException {
+        return initialized;
+    }
+
+    @Override
+    public void init() throws RyaDAOException {
+        if (initialized)
+            return;
+        try {
+            checkNotNull(conf);
+            checkNotNull(connector);
+
+            if(batchWriterConfig == null){
+                batchWriterConfig = new BatchWriterConfig();
+                batchWriterConfig.setMaxMemory(MAX_MEMORY);
+                batchWriterConfig.setTimeout(MAX_TIME, TimeUnit.MILLISECONDS);
+                batchWriterConfig.setMaxWriteThreads(NUM_THREADS);
+            }
+
+            tableLayoutStrategy = conf.getTableLayoutStrategy();
+            ryaContext = RyaTripleContext.getInstance(conf);
+            ryaTableMutationsFactory = new 
RyaTableMutationsFactory(ryaContext);
+            
+            secondaryIndexers = conf.getAdditionalIndexers();
+            
+            TableOperations tableOperations = connector.tableOperations();
+            AccumuloRdfUtils.createTableIfNotExist(tableOperations, 
tableLayoutStrategy.getSpo());
+            AccumuloRdfUtils.createTableIfNotExist(tableOperations, 
tableLayoutStrategy.getPo());
+            AccumuloRdfUtils.createTableIfNotExist(tableOperations, 
tableLayoutStrategy.getOsp());
+            AccumuloRdfUtils.createTableIfNotExist(tableOperations, 
tableLayoutStrategy.getNs());
+            
+            for (AccumuloIndexer index : secondaryIndexers) {
+                index.setConf(conf);
+            }
+
+            mt_bw = connector.createMultiTableBatchWriter(batchWriterConfig);
+
+            //get the batch writers for tables
+            bw_spo = mt_bw.getBatchWriter(tableLayoutStrategy.getSpo());
+            bw_po = mt_bw.getBatchWriter(tableLayoutStrategy.getPo());
+            bw_osp = mt_bw.getBatchWriter(tableLayoutStrategy.getOsp());
+
+            bw_ns = connector.createBatchWriter(tableLayoutStrategy.getNs(), 
MAX_MEMORY,
+                    MAX_TIME, 1);
+            
+            for (AccumuloIndexer index : secondaryIndexers) {
+                index.setMultiTableBatchWriter(mt_bw);
+            }
+
+            queryEngine = new AccumuloRyaQueryEngine(connector, conf);
+
+            checkVersion();
+
+            initialized = true;
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        }
+    }
+
+    public String getVersion() throws RyaDAOException {
+        String version = null;
+        CloseableIteration<RyaStatement, RyaDAOException> versIter = 
queryEngine.query(new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, 
null), conf);
+        if (versIter.hasNext()) {
+            version = versIter.next().getObject().getData();
+        }
+        versIter.close();
+
+        return version;
+    }
+
+    @Override
+    public void add(RyaStatement statement) throws RyaDAOException {
+        commit(Iterators.singletonIterator(statement));
+    }
+
+    @Override
+    public void add(Iterator<RyaStatement> iter) throws RyaDAOException {
+        commit(iter);
+    }
+
+    @Override
+    public void delete(RyaStatement stmt, AccumuloRdfConfiguration aconf) 
throws RyaDAOException {
+        this.delete(Iterators.singletonIterator(stmt), aconf);
+        //TODO currently all indexers do not support delete
+    }
+
+    @Override
+    public void delete(Iterator<RyaStatement> statements, 
AccumuloRdfConfiguration conf) throws RyaDAOException {
+        try {
+            while (statements.hasNext()) {
+                RyaStatement stmt = statements.next();
+                //query first
+                CloseableIteration<RyaStatement, RyaDAOException> query = 
this.queryEngine.query(stmt, conf);
+                while (query.hasNext()) {
+                    deleteSingleRyaStatement(query.next());
+                }
+            }
+            mt_bw.flush();
+            //TODO currently all indexers do not support delete
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        }
+    }
+    
+    @Override
+    public void dropGraph(AccumuloRdfConfiguration conf, RyaURI... graphs) 
throws RyaDAOException {
+        BatchDeleter bd_spo = null;
+        BatchDeleter bd_po = null;
+        BatchDeleter bd_osp = null;
+
+        try {
+            bd_spo = createBatchDeleter(tableLayoutStrategy.getSpo(), 
conf.getAuthorizations());
+            bd_po = createBatchDeleter(tableLayoutStrategy.getPo(), 
conf.getAuthorizations());
+            bd_osp = createBatchDeleter(tableLayoutStrategy.getOsp(), 
conf.getAuthorizations());
+
+            bd_spo.setRanges(Collections.singleton(new Range()));
+            bd_po.setRanges(Collections.singleton(new Range()));
+            bd_osp.setRanges(Collections.singleton(new Range()));
+
+            for (RyaURI graph : graphs){
+                bd_spo.fetchColumnFamily(new Text(graph.getData()));
+                bd_po.fetchColumnFamily(new Text(graph.getData()));
+                bd_osp.fetchColumnFamily(new Text(graph.getData()));
+            }
+            
+            bd_spo.delete();
+            bd_po.delete();
+            bd_osp.delete();
+            
+            //TODO indexers do not support delete-UnsupportedOperation 
Exception will be thrown
+//            for (AccumuloIndex index : secondaryIndexers) {
+//                index.dropGraph(graphs);
+//            }
+            
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        } finally {
+            if (bd_spo != null) bd_spo.close();
+            if (bd_po != null) bd_po.close();
+            if (bd_osp != null) bd_osp.close();
+        }
+        
+    }
+
+    protected void deleteSingleRyaStatement(RyaStatement stmt) throws 
TripleRowResolverException, MutationsRejectedException {
+        Map<TABLE_LAYOUT, TripleRow> map = ryaContext.serializeTriple(stmt);
+        bw_spo.addMutation(deleteMutation(map.get(TABLE_LAYOUT.SPO)));
+        bw_po.addMutation(deleteMutation(map.get(TABLE_LAYOUT.PO)));
+        bw_osp.addMutation(deleteMutation(map.get(TABLE_LAYOUT.OSP)));
+
+    }
+
+    protected Mutation deleteMutation(TripleRow tripleRow) {
+        Mutation m = new Mutation(new Text(tripleRow.getRow()));
+
+        byte[] columnFamily = tripleRow.getColumnFamily();
+        Text cfText = columnFamily == null ? EMPTY_TEXT : new 
Text(columnFamily);
+
+        byte[] columnQualifier = tripleRow.getColumnQualifier();
+        Text cqText = columnQualifier == null ? EMPTY_TEXT : new 
Text(columnQualifier);
+
+        m.putDelete(cfText, cqText, new 
ColumnVisibility(tripleRow.getColumnVisibility()),
+                    tripleRow.getTimestamp());
+        return m;
+    }
+
+    protected void commit(Iterator<RyaStatement> commitStatements) throws 
RyaDAOException {
+        try {
+            //TODO: Should have a lock here in case we are adding and 
committing at the same time
+            while (commitStatements.hasNext()) {
+                RyaStatement stmt = commitStatements.next();
+                
+                Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = 
ryaTableMutationsFactory.serialize(stmt);
+                Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO);
+                Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO);
+                Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP);
+                bw_spo.addMutations(spo);
+                bw_po.addMutations(po);
+                bw_osp.addMutations(osp);
+                
+                for (AccumuloIndexer index : secondaryIndexers) {
+                    index.storeStatement(stmt);
+                }
+            }
+
+            mt_bw.flush();
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        }
+    }
+
+    @Override
+    public void destroy() throws RyaDAOException {
+        if (!initialized) {
+            return;
+        }
+        //TODO: write lock
+        try {
+            initialized = false;
+            mt_bw.flush();
+            bw_ns.flush();
+
+            mt_bw.close();
+            bw_ns.close();
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        }
+    }
+
+    @Override
+    public void addNamespace(String pfx, String namespace) throws 
RyaDAOException {
+        try {
+            Mutation m = new Mutation(new Text(pfx));
+            m.put(INFO_NAMESPACE_TXT, EMPTY_TEXT, new 
Value(namespace.getBytes()));
+            bw_ns.addMutation(m);
+            bw_ns.flush();
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        }
+    }
+
+    @Override
+    public String getNamespace(String pfx) throws RyaDAOException {
+        try {
+            Scanner scanner = 
connector.createScanner(tableLayoutStrategy.getNs(),
+                    ALL_AUTHORIZATIONS);
+            scanner.fetchColumn(INFO_NAMESPACE_TXT, EMPTY_TEXT);
+            scanner.setRange(new Range(new Text(pfx)));
+            Iterator<Map.Entry<Key, Value>> iterator = scanner
+                    .iterator();
+
+            if (iterator.hasNext()) {
+                return new String(iterator.next().getValue().get());
+            }
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        }
+        return null;
+    }
+
+    @Override
+    public void removeNamespace(String pfx) throws RyaDAOException {
+        try {
+            Mutation del = new Mutation(new Text(pfx));
+            del.putDelete(INFO_NAMESPACE_TXT, EMPTY_TEXT);
+            bw_ns.addMutation(del);
+            bw_ns.flush();
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        }
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @Override
+    public CloseableIteration<Namespace, RyaDAOException> iterateNamespace() 
throws RyaDAOException {
+        try {
+            Scanner scanner = 
connector.createScanner(tableLayoutStrategy.getNs(),
+                    ALL_AUTHORIZATIONS);
+            scanner.fetchColumnFamily(INFO_NAMESPACE_TXT);
+            Iterator<Map.Entry<Key, Value>> result = scanner.iterator();
+            return new AccumuloNamespaceTableIterator(result);
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        }
+    }
+
+    @Override
+    public RyaNamespaceManager<AccumuloRdfConfiguration> getNamespaceManager() 
{
+        return this;
+    }
+
+    @Override
+    public void purge(RdfCloudTripleStoreConfiguration configuration) {
+        for (String tableName : getTables()) {
+            try {
+                purge(tableName, configuration.getAuths());
+                compact(tableName);
+            } catch (TableNotFoundException e) {
+                logger.error(e.getMessage());
+            } catch (MutationsRejectedException e) {
+                logger.error(e.getMessage());
+            }
+        }
+    }
+
+    @Override
+    public void dropAndDestroy() throws RyaDAOException {
+        for (String tableName : getTables()) {
+            try {
+                drop(tableName);
+            } catch (AccumuloSecurityException e) {
+                logger.error(e.getMessage());
+                throw new RyaDAOException(e);
+            } catch (AccumuloException e) {
+                logger.error(e.getMessage());
+                throw new RyaDAOException(e);
+            } catch (TableNotFoundException e) {
+                logger.warn(e.getMessage());
+            }
+        }
+        destroy();
+    }
+
+    public Connector getConnector() {
+        return connector;
+    }
+
+    public void setConnector(Connector connector) {
+        this.connector = connector;
+    }
+
+    public BatchWriterConfig getBatchWriterConfig(){
+        return batchWriterConfig;
+    }
+
+    public void setBatchWriterConfig(BatchWriterConfig batchWriterConfig) {
+        this.batchWriterConfig = batchWriterConfig;
+    }
+
+    protected MultiTableBatchWriter getMultiTableBatchWriter(){
+       return mt_bw;
+    }
+
+    public AccumuloRdfConfiguration getConf() {
+        return conf;
+    }
+
+    public void setConf(AccumuloRdfConfiguration conf) {
+        this.conf = conf;
+    }
+
+    public RyaTableMutationsFactory getRyaTableMutationsFactory() {
+        return ryaTableMutationsFactory;
+    }
+
+    public void setRyaTableMutationsFactory(RyaTableMutationsFactory 
ryaTableMutationsFactory) {
+        this.ryaTableMutationsFactory = ryaTableMutationsFactory;
+    }
+
+    public AccumuloRyaQueryEngine getQueryEngine() {
+        return queryEngine;
+    }
+
+    public void setQueryEngine(AccumuloRyaQueryEngine queryEngine) {
+        this.queryEngine = queryEngine;
+    }
+
+    protected String[] getTables() {
+        // core tables
+        List<String> tableNames = Lists.newArrayList(
+                tableLayoutStrategy.getSpo(), 
+                tableLayoutStrategy.getPo(), 
+                tableLayoutStrategy.getOsp(), 
+                tableLayoutStrategy.getNs(),
+                tableLayoutStrategy.getEval());
+        
+        // Additional Tables        
+        for (AccumuloIndexer index : secondaryIndexers) {
+            tableNames.add(index.getTableName());
+        }
+
+        return tableNames.toArray(new String[]{});
+    }
+
+    private void purge(String tableName, String[] auths) throws 
TableNotFoundException, MutationsRejectedException {
+        if (tableExists(tableName)) {
+            logger.info("Purging accumulo table: " + tableName);
+            BatchDeleter batchDeleter = createBatchDeleter(tableName, new 
Authorizations(auths));
+            try {
+                batchDeleter.setRanges(Collections.singleton(new Range()));
+                batchDeleter.delete();
+            } finally {
+                batchDeleter.close();
+            }
+        }
+    }
+
+    private void compact(String tableName) {
+        logger.info("Requesting major compaction for table " + tableName);
+        try {
+            connector.tableOperations().compact(tableName, null, null, true, 
false);
+        } catch (Exception e) {
+            logger.error(e.getMessage());
+        }
+    }
+
+    private boolean tableExists(String tableName) {
+        return getConnector().tableOperations().exists(tableName);
+    }
+
+    private BatchDeleter createBatchDeleter(String tableName, Authorizations 
authorizations) throws TableNotFoundException {
+        return connector.createBatchDeleter(tableName, authorizations, 
NUM_THREADS, MAX_MEMORY, MAX_TIME, NUM_THREADS);
+    }
+
+    private void checkVersion() throws RyaDAOException {
+        String version = getVersion();
+        if (version == null) {
+            this.add(getVersionRyaStatement());
+        }
+        //TODO: Do a version check here
+    }
+
+    protected RyaStatement getVersionRyaStatement() {
+        return new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, 
VERSION_RYA);
+    }
+
+    private void drop(String tableName) throws TableNotFoundException, 
AccumuloException, AccumuloSecurityException {
+        logger.info("Dropping cloudbase table: " + tableName);
+        connector.tableOperations().delete(tableName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/DefineTripleQueryRangeFactory.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/DefineTripleQueryRangeFactory.java
 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/DefineTripleQueryRangeFactory.java
new file mode 100644
index 0000000..3e51ab5
--- /dev/null
+++ 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/DefineTripleQueryRangeFactory.java
@@ -0,0 +1,152 @@
+//package mvm.rya.accumulo;
+
+/*
+ * #%L
+ * mvm.rya.accumulo.rya
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+//
+//import com.google.common.io.ByteArrayDataOutput;
+//import com.google.common.io.ByteStreams;
+//import mvm.rya.api.RdfCloudTripleStoreUtils;
+//import mvm.rya.api.domain.RangeValue;
+//import org.apache.accumulo.core.data.Range;
+//import org.apache.hadoop.io.Text;
+//import org.openrdf.model.Value;
+//import org.openrdf.model.ValueFactory;
+//import org.openrdf.model.impl.ValueFactoryImpl;
+//
+//import java.io.IOException;
+//import java.util.Map;
+//
+//import static mvm.rya.api.RdfCloudTripleStoreConstants.*;
+//import static mvm.rya.api.RdfCloudTripleStoreUtils.CustomEntry;
+//
+///**
+// * Class DefineTripleQueryRangeFactory
+// * Date: Jun 2, 2011
+// * Time: 10:35:43 AM
+// */
+//public class DefineTripleQueryRangeFactory {
+//
+//    ValueFactory vf = ValueFactoryImpl.getInstance();
+//
+//    protected void fillRange(ByteArrayDataOutput startRowOut, 
ByteArrayDataOutput endRowOut, Value val, boolean empty)
+//            throws IOException {
+//        if(!empty) {
+//            startRowOut.write(DELIM_BYTES);
+//            endRowOut.write(DELIM_BYTES);
+//        }
+//        //null check?
+//        if(val instanceof RangeValue) {
+//            RangeValue rangeValue = (RangeValue) val;
+//            Value start = rangeValue.getStart();
+//            Value end = rangeValue.getEnd();
+//            byte[] start_val_bytes = 
RdfCloudTripleStoreUtils.writeValue(start);
+//            byte[] end_val_bytes = RdfCloudTripleStoreUtils.writeValue(end);
+//            startRowOut.write(start_val_bytes);
+//            endRowOut.write(end_val_bytes);
+//        } else {
+//            byte[] val_bytes = RdfCloudTripleStoreUtils.writeValue(val);
+//            startRowOut.write(val_bytes);
+//            endRowOut.write(val_bytes);
+//        }
+//    }
+//
+//    public Map.Entry<TABLE_LAYOUT, Range> defineRange(Value subject, Value 
predicate, Value object, AccumuloRdfConfiguration conf)
+//            throws IOException {
+//
+//        byte[] startrow, stoprow;
+//        ByteArrayDataOutput startRowOut = ByteStreams.newDataOutput();
+//        ByteArrayDataOutput stopRowOut = ByteStreams.newDataOutput();
+//        Range range;
+//        TABLE_LAYOUT tableLayout;
+//
+//        if (subject != null) {
+//            /**
+//             * Case: s
+//             * Table: spo
+//             * Want this to be the first if statement since it will be most 
likely the most asked for table
+//             */
+//            tableLayout = TABLE_LAYOUT.SPO;
+//            fillRange(startRowOut, stopRowOut, subject, true);
+//            if (predicate != null) {
+//                /**
+//                 * Case: sp
+//                 * Table: spo
+//                 */
+//                fillRange(startRowOut, stopRowOut, predicate, false);
+//                if (object != null) {
+//                    /**
+//                     * Case: spo
+//                     * Table: spo
+//                     */
+//                    fillRange(startRowOut, stopRowOut, object, false);
+//                }
+//            } else if (object != null) {
+//                /**
+//                 * Case: so
+//                 * Table: osp
+//                 * Very rare case. Could have put this in the OSP if clause, 
but I wanted to reorder the if statement
+//                 * for best performance. The SPO table probably gets the 
most scans, so I want it to be the first if
+//                 * statement in the branch.
+//                 */
+//                tableLayout = TABLE_LAYOUT.OSP;
+//                startRowOut = ByteStreams.newDataOutput();
+//                stopRowOut = ByteStreams.newDataOutput();
+//                fillRange(startRowOut, stopRowOut, object, true);
+//                fillRange(startRowOut, stopRowOut, subject, false);
+//            }
+//        } else if (predicate != null) {
+//            /**
+//             * Case: p
+//             * Table: po
+//             * Wanted this to be the second if statement, since it will be 
the second most asked for table
+//             */
+//            tableLayout = TABLE_LAYOUT.PO;
+//            fillRange(startRowOut, stopRowOut, predicate, true);
+//            if (object != null) {
+//                /**
+//                 * Case: po
+//                 * Table: po
+//                 */
+//                fillRange(startRowOut, stopRowOut, object, false);
+//            }
+//        } else if (object != null) {
+//            /**
+//             * Case: o
+//             * Table: osp
+//             * Probably a pretty rare scenario
+//             */
+//            tableLayout = TABLE_LAYOUT.OSP;
+//            fillRange(startRowOut, stopRowOut, object, true);
+//        } else {
+//            tableLayout = TABLE_LAYOUT.SPO;
+//            stopRowOut.write(Byte.MAX_VALUE);
+//        }
+//
+//        startrow = startRowOut.toByteArray();
+//        stopRowOut.write(DELIM_STOP_BYTES);
+//        stoprow = stopRowOut.toByteArray();
+//        Text startRowTxt = new Text(startrow);
+//        Text stopRowTxt = new Text(stoprow);
+//        range = new Range(startRowTxt, stopRowTxt);
+//
+//        return new CustomEntry<TABLE_LAYOUT, Range>(tableLayout, range);
+//    }
+//
+//}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableKeyValues.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableKeyValues.java 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableKeyValues.java
new file mode 100644
index 0000000..ee217be
--- /dev/null
+++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableKeyValues.java
@@ -0,0 +1,114 @@
+package mvm.rya.accumulo;
+
+/*
+ * #%L
+ * mvm.rya.accumulo.rya
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_VALUE;
+
+import java.io.IOException;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.resolver.RyaTripleContext;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.io.Text;
+
+public class RyaTableKeyValues {
+    public static final ColumnVisibility EMPTY_CV = new ColumnVisibility();
+    public static final Text EMPTY_CV_TEXT = new 
Text(EMPTY_CV.getExpression());
+
+    RyaTripleContext instance;
+
+    private RyaStatement stmt;
+    private Collection<Map.Entry<Key, Value>> spo = new 
ArrayList<Map.Entry<Key, Value>>();
+    private Collection<Map.Entry<Key, Value>> po = new 
ArrayList<Map.Entry<Key, Value>>();
+    private Collection<Map.Entry<Key, Value>> osp = new 
ArrayList<Map.Entry<Key, Value>>();
+
+    public RyaTableKeyValues(RyaStatement stmt, 
RdfCloudTripleStoreConfiguration conf) {
+        this.stmt = stmt;
+        this.instance = RyaTripleContext.getInstance(conf);
+    }
+
+    public Collection<Map.Entry<Key, Value>> getSpo() {
+        return spo;
+    }
+
+    public Collection<Map.Entry<Key, Value>> getPo() {
+        return po;
+    }
+
+    public Collection<Map.Entry<Key, Value>> getOsp() {
+        return osp;
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public RyaTableKeyValues invoke() throws IOException {
+        /**
+         * TODO: If there are contexts, do we still replicate the information 
into the default graph as well
+         * as the named graphs?
+         */try {
+            Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, 
mvm.rya.api.resolver.triple.TripleRow> rowMap = instance.serializeTriple(stmt);
+            TripleRow tripleRow = 
rowMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO);
+            byte[] columnVisibility = tripleRow.getColumnVisibility();
+            Text cv = columnVisibility == null ? EMPTY_CV_TEXT : new 
Text(columnVisibility);
+            Long timestamp = tripleRow.getTimestamp();
+            timestamp = timestamp == null ? 0l : timestamp;
+            byte[] value = tripleRow.getValue();
+            Value v = value == null ? EMPTY_VALUE : new Value(value);
+            spo.add(new SimpleEntry(new Key(new Text(tripleRow.getRow()),
+                    new Text(tripleRow.getColumnFamily()),
+                    new Text(tripleRow.getColumnQualifier()),
+                    cv, timestamp), v));
+            tripleRow = 
rowMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO);
+            po.add(new SimpleEntry(new Key(new Text(tripleRow.getRow()),
+                    new Text(tripleRow.getColumnFamily()),
+                    new Text(tripleRow.getColumnQualifier()),
+                    cv, timestamp), v));
+            tripleRow = 
rowMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP);
+            osp.add(new SimpleEntry(new Key(new Text(tripleRow.getRow()),
+                    new Text(tripleRow.getColumnFamily()),
+                    new Text(tripleRow.getColumnQualifier()),
+                    cv, timestamp), v));
+        } catch (TripleRowResolverException e) {
+            throw new IOException(e);
+        }
+        return this;
+    }
+
+    @Override
+    public String toString() {
+        return "RyaTableKeyValues{" +
+                "statement=" + stmt +
+                ", spo=" + spo +
+                ", po=" + po +
+                ", o=" + osp +
+                '}';
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableMutationsFactory.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableMutationsFactory.java 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableMutationsFactory.java
new file mode 100644
index 0000000..094da63
--- /dev/null
+++ 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableMutationsFactory.java
@@ -0,0 +1,101 @@
+package mvm.rya.accumulo;
+
+/*
+ * #%L
+ * mvm.rya.accumulo.rya
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_CV;
+import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_VALUE;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.resolver.RyaTripleContext;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.io.Text;
+
+public class RyaTableMutationsFactory {
+
+    RyaTripleContext ryaContext;
+
+    public RyaTableMutationsFactory(RyaTripleContext ryaContext) {
+       this.ryaContext = ryaContext;
+    }
+
+    //TODO: Does this still need to be collections
+    public Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, 
Collection<Mutation>> serialize(
+            RyaStatement stmt) throws IOException {
+
+        Collection<Mutation> spo_muts = new ArrayList<Mutation>();
+        Collection<Mutation> po_muts = new ArrayList<Mutation>();
+        Collection<Mutation> osp_muts = new ArrayList<Mutation>();
+        /**
+         * TODO: If there are contexts, do we still replicate the information 
into the default graph as well
+         * as the named graphs?
+         */
+        try {
+            Map<TABLE_LAYOUT, TripleRow> rowMap = 
ryaContext.serializeTriple(stmt);
+            TripleRow tripleRow = rowMap.get(TABLE_LAYOUT.SPO);
+            spo_muts.add(createMutation(tripleRow));
+            tripleRow = rowMap.get(TABLE_LAYOUT.PO);
+            po_muts.add(createMutation(tripleRow));
+            tripleRow = rowMap.get(TABLE_LAYOUT.OSP);
+            osp_muts.add(createMutation(tripleRow));
+        } catch (TripleRowResolverException fe) {
+            throw new IOException(fe);
+        }
+
+        Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> 
mutations =
+                new HashMap<RdfCloudTripleStoreConstants.TABLE_LAYOUT, 
Collection<Mutation>>();
+        mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO, spo_muts);
+        mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO, po_muts);
+        mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP, osp_muts);
+
+        return mutations;
+    }
+
+    protected Mutation createMutation(TripleRow tripleRow) {
+        Mutation mutation = new Mutation(new Text(tripleRow.getRow()));
+        byte[] columnVisibility = tripleRow.getColumnVisibility();
+        ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new 
ColumnVisibility(columnVisibility);
+        Long timestamp = tripleRow.getTimestamp();
+        byte[] value = tripleRow.getValue();
+        Value v = value == null ? EMPTY_VALUE : new Value(value);
+        byte[] columnQualifier = tripleRow.getColumnQualifier();
+        Text cqText = columnQualifier == null ? EMPTY_TEXT : new 
Text(columnQualifier);
+        byte[] columnFamily = tripleRow.getColumnFamily();
+        Text cfText = columnFamily == null ? EMPTY_TEXT : new 
Text(columnFamily);
+
+        mutation.put(cfText, cqText, cv, timestamp, v);
+        return mutation;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AbstractAccumuloIndexer.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AbstractAccumuloIndexer.java
 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AbstractAccumuloIndexer.java
new file mode 100644
index 0000000..2244f66
--- /dev/null
+++ 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AbstractAccumuloIndexer.java
@@ -0,0 +1,39 @@
+package mvm.rya.accumulo.experimental;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaURI;
+
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+
+public abstract class AbstractAccumuloIndexer implements AccumuloIndexer {
+
+    @Override
+    public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws 
IOException {
+    }
+
+    @Override
+    public void storeStatements(Collection<RyaStatement> statements) throws 
IOException {
+        for (RyaStatement s : statements) {
+            storeStatement(s);
+        }
+    }
+
+    @Override
+    public void deleteStatement(RyaStatement stmt) throws IOException {
+    }
+
+    @Override
+    public void dropGraph(RyaURI... graphs) {
+    }
+
+    @Override
+    public void flush() throws IOException {
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AccumuloIndexer.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AccumuloIndexer.java
 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AccumuloIndexer.java
new file mode 100644
index 0000000..1bd75ba
--- /dev/null
+++ 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AccumuloIndexer.java
@@ -0,0 +1,13 @@
+package mvm.rya.accumulo.experimental;
+
+import java.io.IOException;
+
+import mvm.rya.api.persist.index.RyaSecondaryIndexer;
+
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+
+public interface AccumuloIndexer extends RyaSecondaryIndexer {
+    
+    public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws 
IOException;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/AbstractAccumuloMRTool.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/AbstractAccumuloMRTool.java
 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/AbstractAccumuloMRTool.java
new file mode 100644
index 0000000..a148e6b
--- /dev/null
+++ 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/AbstractAccumuloMRTool.java
@@ -0,0 +1,163 @@
+package mvm.rya.accumulo.mr;
+
+/*
+ * #%L
+ * mvm.rya.accumulo.rya
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import mvm.rya.accumulo.AccumuloRdfConstants;
+import mvm.rya.accumulo.mr.utils.AccumuloHDFSFileInputFormat;
+import mvm.rya.accumulo.mr.utils.MRUtils;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.RdfCloudTripleStoreUtils;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.iterators.user.AgeOffFilter;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ */
+public abstract class AbstractAccumuloMRTool {
+
+    protected Configuration conf;
+    protected RdfCloudTripleStoreConstants.TABLE_LAYOUT rdfTableLayout = 
RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP;
+    protected String userName = "root";
+    protected String pwd = "root";
+    protected String instance = "instance";
+    protected String zk = "zoo";
+    protected Authorizations authorizations = 
AccumuloRdfConstants.ALL_AUTHORIZATIONS;
+    protected String ttl = null;
+    protected boolean mock = false;
+    protected boolean hdfsInput = false;
+    protected String tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
+
+    protected void init() {
+        zk = conf.get(MRUtils.AC_ZK_PROP, zk);
+        ttl = conf.get(MRUtils.AC_TTL_PROP, ttl);
+        instance = conf.get(MRUtils.AC_INSTANCE_PROP, instance);
+        userName = conf.get(MRUtils.AC_USERNAME_PROP, userName);
+        pwd = conf.get(MRUtils.AC_PWD_PROP, pwd);
+        mock = conf.getBoolean(MRUtils.AC_MOCK_PROP, mock);
+        hdfsInput = conf.getBoolean(MRUtils.AC_HDFS_INPUT_PROP, hdfsInput);
+        tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, tablePrefix);
+        if (tablePrefix != null)
+            RdfCloudTripleStoreConstants.prefixTables(tablePrefix);
+        rdfTableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.valueOf(
+                conf.get(MRUtils.TABLE_LAYOUT_PROP, 
RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP.toString()));
+        String auth = conf.get(MRUtils.AC_AUTH_PROP);
+        if (auth != null)
+            authorizations = new Authorizations(auth.split(","));
+
+        if (!mock) {
+            conf.setBoolean("mapred.map.tasks.speculative.execution", false);
+            conf.setBoolean("mapred.reduce.tasks.speculative.execution", 
false);
+            conf.set("io.sort.mb", "256");
+        }
+
+        //set ttl
+        ttl = conf.get(MRUtils.AC_TTL_PROP);
+    }
+
+    protected void setupInputFormat(Job job) throws AccumuloSecurityException {
+        // set up accumulo input
+        if (!hdfsInput) {
+            job.setInputFormatClass(AccumuloInputFormat.class);
+        } else {
+            job.setInputFormatClass(AccumuloHDFSFileInputFormat.class);
+        }
+        AccumuloInputFormat.setConnectorInfo(job, userName, new 
PasswordToken(pwd));
+        AccumuloInputFormat.setInputTableName(job, 
RdfCloudTripleStoreUtils.layoutPrefixToTable(rdfTableLayout, tablePrefix));
+        AccumuloInputFormat.setScanAuthorizations(job, authorizations);
+        if (!mock) {
+            AccumuloInputFormat.setZooKeeperInstance(job, instance, zk);
+        } else {
+            AccumuloInputFormat.setMockInstance(job, instance);
+        }
+        if (ttl != null) {
+            IteratorSetting setting = new IteratorSetting(1, "fi", 
AgeOffFilter.class.getName());
+            AgeOffFilter.setTTL(setting, Long.valueOf(ttl));
+            AccumuloInputFormat.addIterator(job, setting);
+        }
+    }
+
+    protected void setupOutputFormat(Job job, String outputTable) throws 
AccumuloSecurityException {
+        AccumuloOutputFormat.setConnectorInfo(job, userName, new 
PasswordToken(pwd));
+        AccumuloOutputFormat.setCreateTables(job, true);
+        AccumuloOutputFormat.setDefaultTableName(job, outputTable);
+        if (!mock) {
+            AccumuloOutputFormat.setZooKeeperInstance(job, instance, zk);
+        } else {
+            AccumuloOutputFormat.setMockInstance(job, instance);
+        }
+        job.setOutputFormatClass(AccumuloOutputFormat.class);
+    }
+
+    public void setConf(Configuration configuration) {
+        this.conf = configuration;
+    }
+
+    public Configuration getConf() {
+        return conf;
+    }
+
+    public String getInstance() {
+        return instance;
+    }
+
+    public void setInstance(String instance) {
+        this.instance = instance;
+    }
+
+    public String getPwd() {
+        return pwd;
+    }
+
+    public void setPwd(String pwd) {
+        this.pwd = pwd;
+    }
+
+    public String getZk() {
+        return zk;
+    }
+
+    public void setZk(String zk) {
+        this.zk = zk;
+    }
+
+    public String getTtl() {
+        return ttl;
+    }
+
+    public void setTtl(String ttl) {
+        this.ttl = ttl;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    public void setUserName(String userName) {
+        this.userName = userName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountTool.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountTool.java
 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountTool.java
new file mode 100644
index 0000000..0818b5a
--- /dev/null
+++ 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountTool.java
@@ -0,0 +1,257 @@
+package mvm.rya.accumulo.mr.eval;
+
+/*
+ * #%L
+ * mvm.rya.accumulo.rya
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.IOException;
+import java.util.Date;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.AccumuloRdfConstants;
+import mvm.rya.accumulo.mr.AbstractAccumuloMRTool;
+import mvm.rya.accumulo.mr.utils.MRUtils;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.resolver.RyaTripleContext;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+
+/**
+ * Count subject, predicate, object. Save in table
+ * Class RdfCloudTripleStoreCountTool
+ * Date: Apr 12, 2011
+ * Time: 10:39:40 AM
+ * @deprecated
+ */
+public class AccumuloRdfCountTool extends AbstractAccumuloMRTool implements 
Tool {
+
+    public static void main(String[] args) {
+        try {
+
+            ToolRunner.run(new Configuration(), new AccumuloRdfCountTool(), 
args);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * cloudbase props
+     */
+
+    @Override
+    public int run(String[] strings) throws Exception {
+        conf.set(MRUtils.JOB_NAME_PROP, "Gather Evaluation Statistics");
+
+        //initialize
+        init();
+
+        Job job = new Job(conf);
+        job.setJarByClass(AccumuloRdfCountTool.class);
+        setupInputFormat(job);
+
+        AccumuloInputFormat.setRanges(job, Lists.newArrayList(new Range(new 
Text(new byte[]{}), new Text(new byte[]{Byte.MAX_VALUE}))));
+        // set input output of the particular job
+        job.setMapOutputKeyClass(Text.class);
+        job.setMapOutputValueClass(LongWritable.class);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(Mutation.class);
+
+        // set mapper and reducer classes
+        job.setMapperClass(CountPiecesMapper.class);
+        job.setCombinerClass(CountPiecesCombiner.class);
+        job.setReducerClass(CountPiecesReducer.class);
+
+        String outputTable = tablePrefix + 
RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX;
+        setupOutputFormat(job, outputTable);
+
+        // Submit the job
+        Date startTime = new Date();
+        System.out.println("Job started: " + startTime);
+        int exitCode = job.waitForCompletion(true) ? 0 : 1;
+
+        if (exitCode == 0) {
+            Date end_time = new Date();
+            System.out.println("Job ended: " + end_time);
+            System.out.println("The job took "
+                    + (end_time.getTime() - startTime.getTime()) / 1000
+                    + " seconds.");
+            return 0;
+        } else {
+            System.out.println("Job Failed!!!");
+        }
+
+        return -1;
+    }
+
+    public static class CountPiecesMapper extends Mapper<Key, Value, Text, 
LongWritable> {
+
+        public static final byte[] EMPTY_BYTES = new byte[0];
+        private RdfCloudTripleStoreConstants.TABLE_LAYOUT tableLayout = 
RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP;
+
+        ValueFactoryImpl vf = new ValueFactoryImpl();
+
+        private Text keyOut = new Text();
+        private LongWritable valOut = new LongWritable(1);
+        private RyaTripleContext ryaContext;
+
+        @Override
+        protected void setup(Context context) throws IOException, 
InterruptedException {
+            super.setup(context);
+            Configuration conf = context.getConfiguration();
+            tableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.valueOf(
+                    conf.get(MRUtils.TABLE_LAYOUT_PROP, 
RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP.toString()));
+            ryaContext = RyaTripleContext.getInstance(new 
AccumuloRdfConfiguration(conf));
+        }
+
+        @Override
+        protected void map(Key key, Value value, Context context) throws 
IOException, InterruptedException {
+            try {
+                RyaStatement statement = 
ryaContext.deserializeTriple(tableLayout, new 
TripleRow(key.getRow().getBytes(), key.getColumnFamily().getBytes(), 
key.getColumnQualifier().getBytes()));
+                //count each piece subject, pred, object
+
+                String subj = statement.getSubject().getData();
+                String pred = statement.getPredicate().getData();
+//                byte[] objBytes = 
tripleFormat.getValueFormat().serialize(statement.getObject());
+                RyaURI scontext = statement.getContext();
+                boolean includesContext = scontext != null;
+                String scontext_str = (includesContext) ? scontext.getData() : 
null;
+
+                ByteArrayDataOutput output = ByteStreams.newDataOutput();
+                output.writeUTF(subj);
+                output.writeUTF(RdfCloudTripleStoreConstants.SUBJECT_CF);
+                output.writeBoolean(includesContext);
+                if (includesContext)
+                    output.writeUTF(scontext_str);
+                keyOut.set(output.toByteArray());
+                context.write(keyOut, valOut);
+
+                output = ByteStreams.newDataOutput();
+                output.writeUTF(pred);
+                output.writeUTF(RdfCloudTripleStoreConstants.PRED_CF);
+                output.writeBoolean(includesContext);
+                if (includesContext)
+                    output.writeUTF(scontext_str);
+                keyOut.set(output.toByteArray());
+                context.write(keyOut, valOut);
+            } catch (TripleRowResolverException e) {
+                throw new IOException(e);
+            }
+        }
+    }
+
+    public static class CountPiecesCombiner extends Reducer<Text, 
LongWritable, Text, LongWritable> {
+
+        private LongWritable valOut = new LongWritable();
+
+        // TODO: can still add up to be large I guess
+        // any count lower than this does not need to be saved
+        public static final int TOO_LOW = 2;
+
+        @Override
+        protected void reduce(Text key, Iterable<LongWritable> values, Context 
context) throws IOException, InterruptedException {
+            long count = 0;
+            for (LongWritable lw : values) {
+                count += lw.get();
+            }
+
+            if (count <= TOO_LOW)
+                return;
+
+            valOut.set(count);
+            context.write(key, valOut);
+        }
+
+    }
+
+    public static class CountPiecesReducer extends Reducer<Text, LongWritable, 
Text, Mutation> {
+
+        Text row = new Text();
+        Text cat_txt = new Text();
+        Value v_out = new Value();
+        ValueFactory vf = new ValueFactoryImpl();
+
+        // any count lower than this does not need to be saved
+        public static final int TOO_LOW = 10;
+        private String tablePrefix;
+        protected Text table;
+        private ColumnVisibility cv = AccumuloRdfConstants.EMPTY_CV;
+
+        @Override
+        protected void setup(Context context) throws IOException, 
InterruptedException {
+            super.setup(context);
+            tablePrefix = 
context.getConfiguration().get(MRUtils.TABLE_PREFIX_PROPERTY, 
RdfCloudTripleStoreConstants.TBL_PRFX_DEF);
+            table = new Text(tablePrefix + 
RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX);
+            final String cv_s = 
context.getConfiguration().get(MRUtils.AC_CV_PROP);
+            if (cv_s != null)
+                cv = new ColumnVisibility(cv_s);
+        }
+
+        @Override
+        protected void reduce(Text key, Iterable<LongWritable> values, Context 
context) throws IOException, InterruptedException {
+            long count = 0;
+            for (LongWritable lw : values) {
+                count += lw.get();
+            }
+
+            if (count <= TOO_LOW)
+                return;
+
+            ByteArrayDataInput badi = ByteStreams.newDataInput(key.getBytes());
+            String v = badi.readUTF();
+            cat_txt.set(badi.readUTF());
+
+            Text columnQualifier = RdfCloudTripleStoreConstants.EMPTY_TEXT;
+            boolean includesContext = badi.readBoolean();
+            if (includesContext) {
+                columnQualifier = new Text(badi.readUTF());
+            }
+
+            row.set(v);
+            Mutation m = new Mutation(row);
+            v_out.set((count + "").getBytes());
+            m.put(cat_txt, columnQualifier, cv, v_out);
+            context.write(table, m);
+        }
+
+    }
+}
\ No newline at end of file

Reply via email to