http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/Upgrade322ToolTest.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/Upgrade322ToolTest.java
 
b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/Upgrade322ToolTest.java
new file mode 100644
index 0000000..2a09669
--- /dev/null
+++ 
b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/Upgrade322ToolTest.java
@@ -0,0 +1,318 @@
+package mvm.rya.accumulo.mr.upgrade;
+
+/*
+ * #%L
+ * mvm.rya.accumulo.rya
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import junit.framework.TestCase;
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.AccumuloRyaDAO;
+import mvm.rya.accumulo.query.AccumuloRyaQueryEngine;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaType;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.api.persist.query.RyaQuery;
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.*;
+import org.apache.accumulo.core.client.admin.SecurityOperations;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.calrissian.mango.collect.CloseableIterable;
+import org.openrdf.model.vocabulary.XMLSchema;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Created by IntelliJ IDEA.
+ * Date: 4/25/12
+ * Time: 10:51 AM
+ * To change this template use File | Settings | File Templates.
+ */
+public class Upgrade322ToolTest extends TestCase {
+
+    private String user = "user";
+    private String pwd = "pwd";
+    private String instance = "myinstance";
+    private String tablePrefix = "t_";
+    private Authorizations auths = Constants.NO_AUTHS;
+    private Connector connector;
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+
+        final String spoTable = tablePrefix +
+                                RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX;
+        final String poTable = tablePrefix +
+                               RdfCloudTripleStoreConstants.TBL_PO_SUFFIX;
+        final String ospTable = tablePrefix +
+                                RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX;
+
+        connector = new MockInstance(instance).getConnector(user, 
pwd.getBytes());
+
+        connector.tableOperations().create(spoTable);
+        connector.tableOperations().create(poTable);
+        connector.tableOperations().create(ospTable);
+        connector.tableOperations().create(tablePrefix + 
RdfCloudTripleStoreConstants.TBL_NS_SUFFIX);
+        connector.tableOperations().create(tablePrefix + 
RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX);
+        SecurityOperations secOps = connector.securityOperations();
+        secOps.createUser(user, pwd.getBytes(), auths);
+        secOps.grantTablePermission(user, spoTable, TablePermission.READ);
+        secOps.grantTablePermission(user, poTable, TablePermission.READ);
+        secOps.grantTablePermission(user, ospTable, TablePermission.READ);
+        secOps.grantTablePermission(user, tablePrefix + 
RdfCloudTripleStoreConstants.TBL_NS_SUFFIX, TablePermission.READ);
+        secOps.grantTablePermission(user, tablePrefix + 
RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX, TablePermission.READ);
+        secOps.grantTablePermission(user, tablePrefix + 
RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX, TablePermission.WRITE);
+
+        //load data
+        final BatchWriter ospWriter = connector
+          .createBatchWriter(ospTable, new BatchWriterConfig());
+        
ospWriter.addMutation(getMutation("00000000000000000010\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0000http://here/2010/tracked-data-provenance/ns#longLit\u0001\u0004";));
+        
ospWriter.addMutation(getMutation("00000000010\u0000http://here/2010/tracked-data-provenance/ns#uuid10";
 +
+        
"\u0000http://here/2010/tracked-data-provenance/ns#intLit\u0001\u0005";));
+        
ospWriter.addMutation(getMutation("00000010\u0000http://here/2010/tracked-data-provenance/ns#uuid10";
 +
+        "\u0000http://here/2010/tracked-data-provenance/ns#byteLit\u0001\t";));
+        ospWriter.addMutation(getMutation("00001 
1.0\u0000http://here/2010/tracked-data-provenance/ns#uuid10"; +
+        
"\u0000http://here/2010/tracked-data-provenance/ns#doubleLit\u0001\u0006";));
+        
ospWriter.addMutation(getMutation("10\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0000http";
 +
+        "://here/2010/tracked-data-provenance/ns#shortLit\u0001http://www.w3"; +
+        ".org/2001/XMLSchema#short\u0001\b"));
+        
ospWriter.addMutation(getMutation("10.0\u0000http://here/2010/tracked-data-provenance/ns#uuid10";
 +
+        "\u0000http://here/2010/tracked-data-provenance/ns#floatLit\u0001http"; 
+
+        "://www.w3.org/2001/XMLSchema#float\u0001\b"));
+        
ospWriter.addMutation(getMutation("3.0.0\u0000urn:mvm.rya/2012/05#rts\u0000urn:mvm"
 +
+        ".rya/2012/05#version\u0001\u0003"));
+        
ospWriter.addMutation(getMutation("9223370726404375807\u0000http://here/2010/tracked-data-provenance/ns";
 +
+        "#uuid10\u0000http://here/2010/tracked-data-provenance/ns#dateLit"; +
+        "\u0001\u0007"));
+        
ospWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#Created\u0000http://here";
 +
+        "/2010/tracked-data-provenance/ns#uuid10\u0000http://www.w3"; +
+        ".org/1999/02/22-rdf-syntax-ns#type\u0001\u0002"));
+        
ospWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#objectuuid1\u0000http";
 +
+        "://here/2010/tracked-data-provenance/ns#uuid10\u0000http://here/2010"; 
+
+        "/tracked-data-provenance/ns#uriLit\u0001\u0002"));
+        
ospWriter.addMutation(getMutation("stringLit\u0000http://here/2010/tracked-data-provenance/ns#uuid10";
 +
+        "\u0000http://here/2010/tracked-data-provenance/ns#stringLit\u0001"; +
+        "\u0003"));
+        
ospWriter.addMutation(getMutation("true\u0000http://here/2010/tracked-data-provenance/ns#uuid10";
 +
+        
"\u0000http://here/2010/tracked-data-provenance/ns#booleanLit\u0001\n";));
+        ospWriter.flush();
+        ospWriter.close();
+
+        final BatchWriter spoWriter = connector
+          .createBatchWriter(spoTable, new BatchWriterConfig());
+        
spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10\u0000http://here/2010/tracked-data-provenance/ns#longLit\u000000000000000000000010\u0001\u0004";));
+        
spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10";
 +
+                                          
"\u0000http://here/2010/tracked-data-provenance/ns#intLit\u000000000000010\u0001\u0005";));
+        
spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10";
 +
+                                          
"\u0000http://here/2010/tracked-data-provenance/ns#byteLit\u000000000010\u0001\t";));
+        
spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10";
 +
+                                          
"\u0000http://here/2010/tracked-data-provenance/ns#doubleLit\u000000001 
1.0\u0001\u0006"));
+        
spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10\u0000http";
 +
+                                          
"://here/2010/tracked-data-provenance/ns#shortLit\u000010\u0001http://www.w3"; +
+                                          
".org/2001/XMLSchema#short\u0001\b"));
+        
spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10";
 +
+                                          
"\u0000http://here/2010/tracked-data-provenance/ns#floatLit\u0001http"; +
+                                          
"://www.w3.org/2001/XMLSchema#float\u000010.0\u0001\b"));
+        
spoWriter.addMutation(getMutation("urn:mvm.rya/2012/05#rts\u0000urn:mvm" +
+                                          
".rya/2012/05#version\u00003.0.0\u0001\u0003"));
+        
spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns"; 
+
+                                          
"#uuid10\u0000http://here/2010/tracked-data-provenance/ns#dateLit"; +
+                                          
"\u00009223370726404375807\u0001\u0007"));
+        spoWriter.addMutation(getMutation("http://here"; +
+                                          
"/2010/tracked-data-provenance/ns#uuid10\u0000http://www.w3"; +
+                                          
".org/1999/02/22-rdf-syntax-ns#type\u0000http://here/2010/tracked-data-provenance/ns#Created\u0001\u0002";));
+        spoWriter.addMutation(getMutation("http" +
+                                          
"://here/2010/tracked-data-provenance/ns#uuid10\u0000http://here/2010"; +
+                                          
"/tracked-data-provenance/ns#uriLit\u0000http://here/2010/tracked-data-provenance/ns#objectuuid1\u0001\u0002";));
+        
spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10";
 +
+                                          
"\u0000http://here/2010/tracked-data-provenance/ns#stringLit\u0000stringLit\u0001";
 +
+                                          "\u0003"));
+        
spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10";
 +
+                                          
"\u0000http://here/2010/tracked-data-provenance/ns#booleanLit\u0000true\u0001\n";));
+        spoWriter.flush();
+        spoWriter.close();
+
+        final BatchWriter poWriter = connector
+          .createBatchWriter(poTable, new BatchWriterConfig());
+        
poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#longLit\u000000000000000000000010\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\u0004";));
+        
poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#intLit\u000000000000010\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\u0005";));
+        
poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#byteLit\u000000000010\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\t";));
+        
poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#doubleLit\u000000001
 1.0\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\u0006";));
+        
poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#shortLit\u000010\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001http://www.w3";
 +
+                                          
".org/2001/XMLSchema#short\u0001\b"));
+        
poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#floatLit\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001http";
 +
+                                          
"://www.w3.org/2001/XMLSchema#float\u000010.0\u0001\b"));
+        poWriter.addMutation(getMutation("urn:mvm" +
+                                          
".rya/2012/05#version\u00003.0.0\u0000urn:mvm.rya/2012/05#rts\u0001\u0003"));
+        
poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#dateLit";
 +
+                                          
"\u00009223370726404375807\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\u0007";));
+        poWriter.addMutation(getMutation("http://www.w3"; +
+                                          
".org/1999/02/22-rdf-syntax-ns#type\u0000http://here/2010/tracked-data-provenance/ns#Created\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\u0002";));
+        poWriter.addMutation(getMutation("http://here/2010"; +
+                                          
"/tracked-data-provenance/ns#uriLit\u0000http://here/2010/tracked-data-provenance/ns#objectuuid1\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\u0002";));
+        
poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#stringLit\u0000stringLit\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001";
 +
+                                          "\u0003"));
+        
poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#booleanLit\u0000true\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\n";));
+        poWriter.flush();
+        poWriter.close();
+    }
+
+    public Mutation getMutation(String row) {
+        final Mutation mutation = new Mutation(row);
+        mutation.put("", "", "");
+        return mutation;
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        connector.tableOperations().delete(tablePrefix + 
RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
+        connector.tableOperations().delete(
+          tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX);
+        connector.tableOperations().delete(tablePrefix + 
RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX);
+        connector.tableOperations().delete(tablePrefix + 
RdfCloudTripleStoreConstants.TBL_NS_SUFFIX);
+        connector.tableOperations().delete(
+          tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX);
+    }
+
+    public void testUpgrade() throws Exception {
+        Upgrade322Tool.main(new String[]{
+                "-Dac.mock=true",
+                "-Dac.instance=" + instance,
+                "-Dac.username=" + user,
+                "-Dac.pwd=" + pwd,
+                "-Drdf.tablePrefix=" + tablePrefix,
+        });
+
+        final AccumuloRdfConfiguration configuration = new 
AccumuloRdfConfiguration();
+        configuration.setTablePrefix(tablePrefix);
+        final AccumuloRyaDAO ryaDAO = new AccumuloRyaDAO();
+        ryaDAO.setConnector(connector);
+        ryaDAO.setConf(configuration);
+        ryaDAO.init();
+
+        final AccumuloRyaQueryEngine queryEngine = ryaDAO.getQueryEngine();
+
+        verify(new RyaStatement(
+          new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10";),
+          new RyaURI("http://here/2010/tracked-data-provenance/ns#booleanLit";),
+          new RyaType(XMLSchema.BOOLEAN, "true")), queryEngine);
+        verify(new RyaStatement(
+          new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10";),
+          new RyaURI("http://here/2010/tracked-data-provenance/ns#longLit";),
+          new RyaType(XMLSchema.LONG, "10")), queryEngine);
+        verify(new RyaStatement(
+          new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10";),
+          new RyaURI("http://here/2010/tracked-data-provenance/ns#intLit";),
+          new RyaType(XMLSchema.INTEGER, "10")), queryEngine);
+        verify(new RyaStatement(
+          new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10";),
+          new RyaURI("http://here/2010/tracked-data-provenance/ns#byteLit";),
+          new RyaType(XMLSchema.BYTE, "10")), queryEngine);
+        verify(new RyaStatement(
+          new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10";),
+          new RyaURI("http://here/2010/tracked-data-provenance/ns#doubleLit";),
+          new RyaType(XMLSchema.DOUBLE, "10.0")), queryEngine);
+        verify(new RyaStatement(
+          new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10";),
+          new RyaURI("http://here/2010/tracked-data-provenance/ns#dateLit";),
+          new RyaType(XMLSchema.DATETIME, "2011-07-12T06:00:00.000Z")), 
queryEngine);
+        verify(new RyaStatement(
+          new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10";),
+          new RyaURI("http://here/2010/tracked-data-provenance/ns#stringLit";),
+          new RyaType("stringLit")), queryEngine);
+        verify(new RyaStatement(
+          new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10";),
+          new RyaURI("http://here/2010/tracked-data-provenance/ns#uriLit";),
+          new RyaURI("http://here/2010/tracked-data-provenance/ns"; +
+                     "#objectuuid1")), queryEngine);
+        verify(new RyaStatement(
+          new RyaURI("urn:mvm.rya/2012/05#rts"),
+          new RyaURI("urn:mvm.rya/2012/05#version"),
+          new RyaType("3.0.0")), queryEngine);
+    }
+
+    private void verify(RyaStatement ryaStatement, AccumuloRyaQueryEngine 
queryEngine)
+      throws RyaDAOException, IOException {
+
+        //check osp
+        CloseableIterable<RyaStatement> statements =
+          queryEngine.query(RyaQuery.builder(new RyaStatement(null, null, 
ryaStatement.getObject()))
+                                    .build());
+        try {
+            verifyFirstStatement(ryaStatement, statements);
+        } finally {
+            statements.close();
+        }
+
+        //check po
+        statements = queryEngine.query(RyaQuery.builder(
+          new RyaStatement(null, ryaStatement.getPredicate(),
+                           ryaStatement.getObject())).build());
+        try {
+            verifyFirstStatement(ryaStatement, statements);
+        } finally {
+            statements.close();
+        }
+
+        //check spo
+        statements = queryEngine.query(RyaQuery.builder(
+          new RyaStatement(ryaStatement.getSubject(),
+                           ryaStatement.getPredicate(),
+                           ryaStatement.getObject())).build());
+        try {
+            verifyFirstStatement(ryaStatement, statements);
+        } finally {
+            statements.close();
+        }
+    }
+
+    private void verifyFirstStatement(
+      RyaStatement ryaStatement, CloseableIterable<RyaStatement> statements) {
+        final Iterator<RyaStatement> iterator = statements.iterator();
+        assertTrue(iterator.hasNext());
+        final RyaStatement first = iterator.next();
+        assertEquals(ryaStatement.getSubject(), first.getSubject());
+        assertEquals(ryaStatement.getPredicate(), first.getPredicate());
+        assertEquals(ryaStatement.getObject(), first.getObject());
+        assertFalse(iterator.hasNext());
+    }
+
+    public void printTableData(String tableName)
+      throws TableNotFoundException{
+        Scanner scanner = connector.createScanner(tableName, auths);
+        scanner.setRange(new Range());
+        for(Map.Entry<Key, Value> entry : scanner) {
+            final Key key = entry.getKey();
+            final Value value = entry.getValue();
+            System.out.println(key.getRow() + " " + key.getColumnFamily() + " 
" + key.getColumnQualifier() + " " + key.getTimestamp() + " " + 
value.toString());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/UpgradeObjectSerializationTest.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/UpgradeObjectSerializationTest.java
 
b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/UpgradeObjectSerializationTest.java
new file mode 100644
index 0000000..027bd7e
--- /dev/null
+++ 
b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/UpgradeObjectSerializationTest.java
@@ -0,0 +1,118 @@
+package mvm.rya.accumulo.mr.upgrade;
+
+/*
+ * #%L
+ * mvm.rya.accumulo.rya
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import mvm.rya.api.resolver.impl.*;
+import org.junit.Test;
+
+import static 
mvm.rya.accumulo.mr.upgrade.Upgrade322Tool.UpgradeObjectSerialization;
+import static org.junit.Assert.*;
+
+public class UpgradeObjectSerializationTest {
+
+    @Test
+    public void testBooleanUpgrade() throws Exception {
+        String object = "true";
+        final UpgradeObjectSerialization upgradeObjectSerialization
+          = new UpgradeObjectSerialization();
+        final String upgrade = upgradeObjectSerialization
+          .upgrade(object, BooleanRyaTypeResolver.BOOLEAN_LITERAL_MARKER);
+
+        assertEquals("1", upgrade);
+    }
+
+    @Test
+    public void testBooleanUpgradeFalse() throws Exception {
+        String object = "false";
+        final UpgradeObjectSerialization upgradeObjectSerialization
+          = new UpgradeObjectSerialization();
+        final String upgrade = upgradeObjectSerialization
+          .upgrade(object, BooleanRyaTypeResolver.BOOLEAN_LITERAL_MARKER);
+
+        assertEquals("0", upgrade);
+    }
+
+    @Test
+    public void testByteUpgradeLowest() throws Exception {
+        String object = "-127";
+        final UpgradeObjectSerialization upgradeObjectSerialization
+          = new UpgradeObjectSerialization();
+        final String upgrade = upgradeObjectSerialization
+          .upgrade(object, ByteRyaTypeResolver.LITERAL_MARKER);
+
+        assertEquals("81", upgrade);
+    }
+
+    @Test
+    public void testByteUpgradeHighest() throws Exception {
+        String object = "127";
+        final UpgradeObjectSerialization upgradeObjectSerialization
+          = new UpgradeObjectSerialization();
+        final String upgrade = upgradeObjectSerialization
+          .upgrade(object, ByteRyaTypeResolver.LITERAL_MARKER);
+
+        assertEquals("7f", upgrade);
+    }
+
+    @Test
+    public void testLongUpgrade() throws Exception {
+        String object = "00000000000000000010";
+        final UpgradeObjectSerialization upgradeObjectSerialization
+          = new UpgradeObjectSerialization();
+        final String upgrade = upgradeObjectSerialization
+          .upgrade(object, LongRyaTypeResolver.LONG_LITERAL_MARKER);
+
+        assertEquals("800000000000000a", upgrade);
+    }
+
+    @Test
+    public void testIntUpgrade() throws Exception {
+        String object = "00000000010";
+        final UpgradeObjectSerialization upgradeObjectSerialization
+          = new UpgradeObjectSerialization();
+        final String upgrade = upgradeObjectSerialization
+          .upgrade(object, IntegerRyaTypeResolver.INTEGER_LITERAL_MARKER);
+
+        assertEquals("8000000a", upgrade);
+    }
+
+    @Test
+    public void testDateTimeUpgrade() throws Exception {
+        String object = "9223370726404375807";
+        final UpgradeObjectSerialization upgradeObjectSerialization
+          = new UpgradeObjectSerialization();
+        final String upgrade = upgradeObjectSerialization
+          .upgrade(object, DateTimeRyaTypeResolver.DATETIME_LITERAL_MARKER);
+
+        assertEquals("800001311cee3b00", upgrade);
+    }
+
+    @Test
+    public void testDoubleUpgrade() throws Exception {
+        String object = "00001 1.0";
+        final UpgradeObjectSerialization upgradeObjectSerialization
+          = new UpgradeObjectSerialization();
+        final String upgrade = upgradeObjectSerialization
+          .upgrade(object, DoubleRyaTypeResolver.DOUBLE_LITERAL_MARKER);
+
+        assertEquals("c024000000000000", upgrade);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/test/resources/namedgraphs.trig
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/test/resources/namedgraphs.trig 
b/dao/accumulo.rya/src/test/resources/namedgraphs.trig
new file mode 100644
index 0000000..b647632
--- /dev/null
+++ b/dao/accumulo.rya/src/test/resources/namedgraphs.trig
@@ -0,0 +1,7 @@
+@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
+@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
+@prefix swp: <http://www.w3.org/2004/03/trix/swp-1/> .
+@prefix dc: <http://purl.org/dc/elements/1.1/> .
+@prefix ex: <http://www.example.org/vocabulary#> .
+@prefix : <http://www.example.org/exampleDocument#> .
+:G1 { :Monica ex:name "Monica Murphy" . }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/test/resources/test.ntriples
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/test/resources/test.ntriples 
b/dao/accumulo.rya/src/test/resources/test.ntriples
new file mode 100644
index 0000000..26a0a17
--- /dev/null
+++ b/dao/accumulo.rya/src/test/resources/test.ntriples
@@ -0,0 +1 @@
+<urn:lubm:rdfts#GraduateStudent01> <urn:lubm:rdfts#hasFriend> 
<urn:lubm:rdfts#GraduateStudent02> .
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/pom.xml
----------------------------------------------------------------------
diff --git a/dao/cloudbase.rya/pom.xml b/dao/cloudbase.rya/pom.xml
new file mode 100644
index 0000000..5909e9d
--- /dev/null
+++ b/dao/cloudbase.rya/pom.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <groupId>mvm.rya</groupId>
+        <artifactId>rya.dao</artifactId>
+        <version>3.2.5-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>cloudbase.rya</artifactId>
+    <name>${project.groupId}.${project.artifactId}</name>
+    <dependencies>
+        <dependency>
+            <groupId>mvm.rya</groupId>
+            <artifactId>rya.api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>mvm.rya</groupId>
+            <artifactId>cloudbase.utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>mvm.rya</groupId>
+            <artifactId>cloudbase.iterators</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>mvm.rya</groupId>
+            <artifactId>rya.indexing</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <!-- Cloudbase deps -->
+        <dependency>
+            <groupId>cloudbase</groupId>
+            <artifactId>cloudbase-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.zookeeper</groupId>
+            <artifactId>zookeeper</artifactId>
+            <exclusions>
+                <!-- the log4j that comes with zookeeper 3.3.5 has some bad 
dependencies -->
+                <exclusion>
+                    <groupId>javax.jms</groupId>
+                    <artifactId>jms</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jdmk</groupId>
+                    <artifactId>jmxtools</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jmx</groupId>
+                    <artifactId>jmxri</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>com.texeltek</groupId>
+            <artifactId>accumulo-cloudbase-shim</artifactId>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>mvm.rya</groupId>
+            <artifactId>cloudbase.iterators</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+
+        <!-- Sesame runtime -->
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-rio-ntriples</artifactId>
+            <version>${openrdf.sesame.version}</version>
+        </dependency>
+
+    </dependencies>
+
+    <profiles>
+        <profile>
+            <id>mr</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-shade-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <configuration>
+                                    <transformers>
+                                        <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
 />
+                                    </transformers>
+                                </configuration>
+                            </execution>
+                        </executions>
+
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/BatchScannerIterator.java
----------------------------------------------------------------------
diff --git 
a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/BatchScannerIterator.java 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/BatchScannerIterator.java
new file mode 100644
index 0000000..7980d85
--- /dev/null
+++ 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/BatchScannerIterator.java
@@ -0,0 +1,59 @@
+package mvm.rya.cloudbase;
+
+import cloudbase.core.data.Key;
+import cloudbase.core.data.Value;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+
+/**
+ * The intention of this iterator is the wrap the iterator that is returned by 
a
+ * BatchScan in cloudbase in order to serve as a workaround for
+ * ACCUMULO-226 (https://issues.apache.org/jira/browse/ACCUMULO-226).  The bug
+ * involves subsequent calls to hasNext() on batch scan results after false 
has been
+ * returned will return true
+ * <p/>
+ * A patch has been submitted and accepted in Accumulo but this wrapper can be 
used
+ * for previous versions of Cloudbase/Accumulo that do not yet have the patch.
+ */
+public class BatchScannerIterator implements Iterator<Entry<Key, Value>> {
+
+    private Iterator<Entry<Key, Value>> cloudbaseScanner = null;
+
+    private Entry<Key, Value> nextKeyValue = null;
+
+    public BatchScannerIterator(Iterator<Entry<Key, Value>> cloudbaseScanner) {
+        this.cloudbaseScanner = cloudbaseScanner;
+    }
+
+    public boolean hasNext() {
+        if (nextKeyValue == null) {
+            if (cloudbaseScanner.hasNext()) {
+                nextKeyValue = cloudbaseScanner.next();
+            }
+        }
+        return !isTerminatingKeyValue(nextKeyValue);
+    }
+
+    private boolean isTerminatingKeyValue(Entry<Key, Value> nextEntry) {
+        if (nextEntry == null) {
+            return true;
+        }
+        return !(nextEntry.getKey() != null && nextEntry.getValue() != null); 
//Condition taken from cloudbase's TabletServerBatchReaderIterator
+    }
+
+    public Entry<Key, Value> next() {
+        if (hasNext()) {
+            Entry<Key, Value> entry = nextKeyValue;
+            nextKeyValue = null;
+            return entry;
+        } else {
+            throw new NoSuchElementException();
+        }
+    }
+
+    public void remove() {
+        cloudbaseScanner.remove();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseNamespaceTableIterator.java
----------------------------------------------------------------------
diff --git 
a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseNamespaceTableIterator.java
 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseNamespaceTableIterator.java
new file mode 100644
index 0000000..b20d79c
--- /dev/null
+++ 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseNamespaceTableIterator.java
@@ -0,0 +1,78 @@
+package mvm.rya.cloudbase;
+
+import cloudbase.core.data.Key;
+import cloudbase.core.data.Value;
+import com.google.common.base.Preconditions;
+import info.aduna.iteration.CloseableIteration;
+import mvm.rya.api.persist.RdfDAOException;
+import org.openrdf.model.Namespace;
+import org.openrdf.model.impl.NamespaceImpl;
+
+import java.io.IOError;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+public class CloudbaseNamespaceTableIterator<T extends Namespace> implements
+        CloseableIteration<Namespace, RdfDAOException> {
+
+    private boolean open = false;
+    private Iterator<Entry<Key, Value>> result;
+
+    public CloudbaseNamespaceTableIterator(Iterator<Entry<Key, Value>> result) 
throws RdfDAOException {
+        Preconditions.checkNotNull(result);
+        open = true;
+        this.result = result;
+    }
+
+    @Override
+    public void close() throws RdfDAOException {
+        try {
+            verifyIsOpen();
+            open = false;
+        } catch (IOError e) {
+            throw new RdfDAOException(e);
+        }
+    }
+
+    public void verifyIsOpen() throws RdfDAOException {
+        if (!open) {
+            throw new RdfDAOException("Iterator not open");
+        }
+    }
+
+    @Override
+    public boolean hasNext() throws RdfDAOException {
+        verifyIsOpen();
+        return result != null && result.hasNext();
+    }
+
+    @Override
+    public Namespace next() throws RdfDAOException {
+        if (hasNext()) {
+            return getNamespace(result);
+        }
+        return null;
+    }
+
+    public static Namespace getNamespace(Iterator<Entry<Key, Value>> 
rowResults) {
+        for (; rowResults.hasNext(); ) {
+            Entry<Key, Value> next = rowResults.next();
+            Key key = next.getKey();
+            Value val = next.getValue();
+            String cf = key.getColumnFamily().toString();
+            String cq = key.getColumnQualifier().toString();
+            return new NamespaceImpl(key.getRow().toString(), new String(
+                    val.get()));
+        }
+        return null;
+    }
+
+    @Override
+    public void remove() throws RdfDAOException {
+        next();
+    }
+
+    public boolean isOpen() {
+        return open;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfConfiguration.java
----------------------------------------------------------------------
diff --git 
a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfConfiguration.java
 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfConfiguration.java
new file mode 100644
index 0000000..e25c910
--- /dev/null
+++ 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfConfiguration.java
@@ -0,0 +1,44 @@
+package mvm.rya.cloudbase;
+
+import cloudbase.core.security.Authorizations;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Created by IntelliJ IDEA.
+ * Date: 4/25/12
+ * Time: 3:24 PM
+ * To change this template use File | Settings | File Templates.
+ */
+public class CloudbaseRdfConfiguration extends 
RdfCloudTripleStoreConfiguration {
+
+    public static final String MAXRANGES_SCANNER = "cb.query.maxranges";
+
+    public CloudbaseRdfConfiguration() {
+        super();
+    }
+
+    public CloudbaseRdfConfiguration(Configuration other) {
+        super(other);
+    }
+
+    @Override
+    public CloudbaseRdfConfiguration clone() {
+        return new CloudbaseRdfConfiguration(this);
+    }
+
+    public Authorizations getAuthorizations() {
+        String[] auths = getAuths();
+        if (auths == null || auths.length == 0)
+            return CloudbaseRdfConstants.ALL_AUTHORIZATIONS;
+        return new Authorizations(auths);
+    }
+
+    public void setMaxRangesForScanner(Integer max) {
+        setInt(MAXRANGES_SCANNER, max);
+    }
+
+    public Integer getMaxRangesForScanner() {
+        return getInt(MAXRANGES_SCANNER, 2);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfConstants.java
----------------------------------------------------------------------
diff --git 
a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfConstants.java 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfConstants.java
new file mode 100644
index 0000000..690a050
--- /dev/null
+++ 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfConstants.java
@@ -0,0 +1,20 @@
+package mvm.rya.cloudbase;
+
+import cloudbase.core.CBConstants;
+import cloudbase.core.data.Value;
+import cloudbase.core.security.Authorizations;
+import cloudbase.core.security.ColumnVisibility;
+
+/**
+ * Interface CloudbaseRdfConstants
+ * Date: Mar 1, 2012
+ * Time: 7:24:52 PM
+ */
+public interface CloudbaseRdfConstants {
+    public static final Authorizations ALL_AUTHORIZATIONS = 
CBConstants.NO_AUTHS;
+
+    public static final Value EMPTY_VALUE = new Value(new byte[0]);
+
+    public static final ColumnVisibility EMPTY_CV = new ColumnVisibility(new 
byte[0]);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfEvalStatsDAO.java
----------------------------------------------------------------------
diff --git 
a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfEvalStatsDAO.java
 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfEvalStatsDAO.java
new file mode 100644
index 0000000..075d1fe
--- /dev/null
+++ 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfEvalStatsDAO.java
@@ -0,0 +1,138 @@
+package mvm.rya.cloudbase;
+
+import cloudbase.core.client.Connector;
+import cloudbase.core.client.Scanner;
+import cloudbase.core.client.admin.TableOperations;
+import cloudbase.core.data.Key;
+import cloudbase.core.data.Range;
+import cloudbase.core.security.Authorizations;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.RdfCloudTripleStoreStatement;
+import mvm.rya.api.layout.TableLayoutStrategy;
+import mvm.rya.api.persist.RdfDAOException;
+import mvm.rya.api.persist.RdfEvalStatsDAO;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.openrdf.model.Resource;
+import org.openrdf.model.Value;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.*;
+
+/**
+ * Class CloudbaseRdfEvalStatsDAO
+ * Date: Feb 28, 2012
+ * Time: 5:03:16 PM
+ */
+public class CloudbaseRdfEvalStatsDAO implements 
RdfEvalStatsDAO<CloudbaseRdfConfiguration> {
+
+    private boolean initialized = false;
+    private CloudbaseRdfConfiguration conf = new CloudbaseRdfConfiguration();
+
+    private Collection<RdfCloudTripleStoreStatement> statements = new 
ArrayList<RdfCloudTripleStoreStatement>();
+    private Connector connector;
+
+    //    private String evalTable = TBL_EVAL;
+    private TableLayoutStrategy tableLayoutStrategy;
+
+    @Override
+    public void init() throws RdfDAOException {
+        try {
+            if (isInitialized()) {
+                throw new IllegalStateException("Already initialized");
+            }
+            checkNotNull(connector);
+            tableLayoutStrategy = conf.getTableLayoutStrategy();
+//            evalTable = 
conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_EVAL, evalTable);
+//            conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_EVAL, 
evalTable);
+
+            TableOperations tos = connector.tableOperations();
+            CloudbaseRdfUtils.createTableIfNotExist(tos, 
tableLayoutStrategy.getEval());
+//            boolean tableExists = tos.exists(evalTable);
+//            if (!tableExists)
+//                tos.create(evalTable);
+            initialized = true;
+        } catch (Exception e) {
+            throw new RdfDAOException(e);
+        }
+    }
+
+    @Override
+    public double getCardinality(CloudbaseRdfConfiguration conf, 
CARDINALITY_OF card, Value val) throws RdfDAOException {
+        return this.getCardinality(conf, card, val, null);
+    }
+
+    @Override
+    public double getCardinality(CloudbaseRdfConfiguration conf, 
CARDINALITY_OF card, Value val, Resource context) throws RdfDAOException {
+        try {
+            Authorizations authorizations = conf.getAuthorizations();
+            Scanner scanner = 
connector.createScanner(tableLayoutStrategy.getEval(), authorizations);
+            Text cfTxt = null;
+            if (CARDINALITY_OF.SUBJECT.equals(card)) {
+                cfTxt = SUBJECT_CF_TXT;
+            } else if (CARDINALITY_OF.PREDICATE.equals(card)) {
+                cfTxt = PRED_CF_TXT;
+            } else if (CARDINALITY_OF.OBJECT.equals(card)) {
+//                cfTxt = OBJ_CF_TXT;     //TODO: How do we do object 
cardinality
+                return Double.MAX_VALUE;
+            } else throw new IllegalArgumentException("Not right Cardinality[" 
+ card + "]");
+            Text cq = EMPTY_TEXT;
+            if (context != null) {
+                cq = new Text(context.stringValue().getBytes());
+            }
+            scanner.fetchColumn(cfTxt, cq);
+            scanner.setRange(new Range(new 
Text(val.stringValue().getBytes())));
+            Iterator<Map.Entry<Key, cloudbase.core.data.Value>> iter = 
scanner.iterator();
+            if (iter.hasNext()) {
+                return Double.parseDouble(new 
String(iter.next().getValue().get()));
+            }
+        } catch (Exception e) {
+            throw new RdfDAOException(e);
+        }
+
+        //default
+        return -1;
+    }
+
+    @Override
+    public void destroy() throws RdfDAOException {
+        if (!isInitialized()) {
+            throw new IllegalStateException("Not initialized");
+        }
+        initialized = false;
+    }
+
+    @Override
+    public boolean isInitialized() throws RdfDAOException {
+        return initialized;
+    }
+
+    public Connector getConnector() {
+        return connector;
+    }
+
+    public void setConnector(Connector connector) {
+        this.connector = connector;
+    }
+
+//    public String getEvalTable() {
+//        return evalTable;
+//    }
+//
+//    public void setEvalTable(String evalTable) {
+//        this.evalTable = evalTable;
+//    }
+
+    public CloudbaseRdfConfiguration getConf() {
+        return conf;
+    }
+
+    public void setConf(CloudbaseRdfConfiguration conf) {
+        this.conf = conf;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfUtils.java
----------------------------------------------------------------------
diff --git 
a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfUtils.java 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfUtils.java
new file mode 100644
index 0000000..9114ae8
--- /dev/null
+++ b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfUtils.java
@@ -0,0 +1,50 @@
+package mvm.rya.cloudbase;
+
+import cloudbase.core.client.CBException;
+import cloudbase.core.client.CBSecurityException;
+import cloudbase.core.client.TableExistsException;
+import cloudbase.core.client.admin.TableOperations;
+import cloudbase.core.data.Key;
+import cloudbase.core.data.Value;
+import mvm.rya.api.resolver.triple.TripleRow;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_BYTES;
+
+/**
+ * Class CloudbaseRdfUtils
+ * Date: Mar 1, 2012
+ * Time: 7:15:54 PM
+ */
+public class CloudbaseRdfUtils {
+    private static final Log logger = LogFactory.getLog(CloudbaseRyaDAO.class);
+
+    public static void createTableIfNotExist(TableOperations tableOperations, 
String tableName) throws TableExistsException, CBSecurityException, CBException 
{
+        boolean tableExists = tableOperations.exists(tableName);
+        if (!tableExists) {
+            logger.info("Creating cloudbase table: " + tableName);
+            tableOperations.create(tableName);
+        }
+    }
+
+    public static Key from(TripleRow tripleRow) {
+        return new Key(defaultTo(tripleRow.getRow(), EMPTY_BYTES),
+                defaultTo(tripleRow.getColumnFamily(), EMPTY_BYTES),
+                defaultTo(tripleRow.getColumnQualifier(), EMPTY_BYTES),
+                defaultTo(tripleRow.getColumnVisibility(), EMPTY_BYTES),
+                defaultTo(tripleRow.getTimestamp(), Long.MAX_VALUE));
+    }
+
+    public static Value extractValue(TripleRow tripleRow) {
+        return new Value(defaultTo(tripleRow.getValue(), EMPTY_BYTES));
+    }
+
+    private static byte[] defaultTo(byte[] bytes, byte[] def) {
+        return bytes != null ? bytes : def;
+    }
+
+    private static Long defaultTo(Long l, Long def) {
+        return l != null ? l : def;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRyaDAO.java
----------------------------------------------------------------------
diff --git 
a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRyaDAO.java 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRyaDAO.java
new file mode 100644
index 0000000..a3045e6
--- /dev/null
+++ b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRyaDAO.java
@@ -0,0 +1,428 @@
+package mvm.rya.cloudbase;
+
+import cloudbase.core.client.*;
+import cloudbase.core.client.Scanner;
+import cloudbase.core.client.admin.TableOperations;
+import cloudbase.core.client.impl.TabletServerBatchDeleter;
+import cloudbase.core.conf.Property;
+import cloudbase.core.data.Key;
+import cloudbase.core.data.Mutation;
+import cloudbase.core.data.Range;
+import cloudbase.core.security.Authorizations;
+import cloudbase.core.security.ColumnVisibility;
+import com.google.common.collect.Iterators;
+import info.aduna.iteration.CloseableIteration;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.layout.TableLayoutStrategy;
+import mvm.rya.api.persist.RyaDAO;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.api.persist.RyaNamespaceManager;
+import mvm.rya.api.resolver.RyaContext;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+import mvm.rya.cloudbase.query.CloudbaseRyaQueryEngine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.openrdf.model.Namespace;
+
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.*;
+import static mvm.rya.cloudbase.CloudbaseRdfConstants.ALL_AUTHORIZATIONS;
+import static mvm.rya.cloudbase.CloudbaseRdfConstants.EMPTY_CV;
+
+/**
+ * Class CloudbaseRyaDAO
+ * Date: Feb 29, 2012
+ * Time: 12:37:22 PM
+ */
+public class CloudbaseRyaDAO implements RyaDAO<CloudbaseRdfConfiguration>, 
RyaNamespaceManager<CloudbaseRdfConfiguration> {
+    private static final Log logger = LogFactory.getLog(CloudbaseRyaDAO.class);
+
+    private boolean initialized = false;
+    private Connector connector;
+
+    private BatchWriter bw_spo;
+    private BatchWriter bw_po;
+    private BatchWriter bw_osp;
+    private BatchWriter bw_ns;
+
+    private CloudbaseRdfConfiguration conf = new CloudbaseRdfConfiguration();
+    private ColumnVisibility cv = EMPTY_CV;
+    private RyaTableMutationsFactory ryaTableMutationsFactory = new 
RyaTableMutationsFactory();
+    private TableLayoutStrategy tableLayoutStrategy;
+    private CloudbaseRyaQueryEngine queryEngine;
+    private RyaContext ryaContext = RyaContext.getInstance();
+
+    @Override
+    public boolean isInitialized() throws RyaDAOException {
+        return initialized;
+    }
+
+    @Override
+    public void init() throws RyaDAOException {
+        if (initialized)
+            return;
+        try {
+            checkNotNull(conf);
+            checkNotNull(connector);
+
+            tableLayoutStrategy = conf.getTableLayoutStrategy();
+            String cv_s = conf.getCv();
+            if (cv_s != null) {
+                cv = new ColumnVisibility(cv_s);
+            }
+
+            TableOperations tableOperations = connector.tableOperations();
+            CloudbaseRdfUtils.createTableIfNotExist(tableOperations, 
tableLayoutStrategy.getSpo());
+            CloudbaseRdfUtils.createTableIfNotExist(tableOperations, 
tableLayoutStrategy.getPo());
+            CloudbaseRdfUtils.createTableIfNotExist(tableOperations, 
tableLayoutStrategy.getOsp());
+            CloudbaseRdfUtils.createTableIfNotExist(tableOperations, 
tableLayoutStrategy.getNs());
+
+            //get the batch writers for tables
+            bw_spo = connector.createBatchWriter(tableLayoutStrategy.getSpo(), 
MAX_MEMORY, MAX_TIME,
+                    NUM_THREADS);
+            bw_po = connector.createBatchWriter(tableLayoutStrategy.getPo(), 
MAX_MEMORY, MAX_TIME,
+                    NUM_THREADS);
+            bw_osp = connector.createBatchWriter(tableLayoutStrategy.getOsp(), 
MAX_MEMORY, MAX_TIME,
+                    NUM_THREADS);
+
+            bw_ns = connector.createBatchWriter(tableLayoutStrategy.getNs(), 
MAX_MEMORY,
+                    MAX_TIME, 1);
+
+            queryEngine = new CloudbaseRyaQueryEngine(connector, getConf());
+
+            checkVersion();
+
+            initialized = true;
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        }
+    }
+
+    public String getVersion() throws RyaDAOException {
+        String version = null;
+        CloseableIteration<RyaStatement, RyaDAOException> versIter = 
queryEngine.query(new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, 
null), conf);
+        if (versIter.hasNext()) {
+            version = versIter.next().getObject().getData();
+        }
+        versIter.close();
+
+        return version;
+    }
+
+    @Override
+    public void add(RyaStatement statement) throws RyaDAOException {
+        commit(Iterators.singletonIterator(statement));
+    }
+
+    @Override
+    public void add(Iterator<RyaStatement> iter) throws RyaDAOException {
+        commit(iter);
+    }
+
+    @Override
+    public void delete(RyaStatement stmt, CloudbaseRdfConfiguration aconf) 
throws RyaDAOException {
+        this.delete(Iterators.singletonIterator(stmt), aconf);
+    }
+
+    @Override
+    public void delete(Iterator<RyaStatement> statements, 
CloudbaseRdfConfiguration conf) throws RyaDAOException {
+        try {
+            while (statements.hasNext()) {
+                RyaStatement stmt = statements.next();
+                //query first
+                CloseableIteration<RyaStatement, RyaDAOException> query = 
this.queryEngine.query(stmt, conf);
+                while (query.hasNext()) {
+                    deleteSingleRyaStatement(query.next());
+                }
+            }
+            bw_spo.flush();
+            bw_po.flush();
+            bw_osp.flush();
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        }
+    }
+
+    protected void deleteSingleRyaStatement(RyaStatement stmt) throws 
TripleRowResolverException, MutationsRejectedException {
+        Map<TABLE_LAYOUT, TripleRow> map = ryaContext.serializeTriple(stmt);
+        bw_spo.addMutation(deleteMutation(map.get(TABLE_LAYOUT.SPO)));
+        bw_po.addMutation(deleteMutation(map.get(TABLE_LAYOUT.PO)));
+        bw_osp.addMutation(deleteMutation(map.get(TABLE_LAYOUT.OSP)));
+    }
+
+    protected Mutation deleteMutation(TripleRow tripleRow) {
+        Mutation m = new Mutation(new Text(tripleRow.getRow()));
+
+        byte[] columnFamily = tripleRow.getColumnFamily();
+        Text cfText = columnFamily == null ? EMPTY_TEXT : new 
Text(columnFamily);
+
+        byte[] columnQualifier = tripleRow.getColumnQualifier();
+        Text cqText = columnQualifier == null ? EMPTY_TEXT : new 
Text(columnQualifier);
+
+        m.putDelete(cfText, cqText, new 
ColumnVisibility(tripleRow.getColumnVisibility()), tripleRow.getTimestamp());
+        return m;
+    }
+
+    protected void commit(Iterator<RyaStatement> commitStatements) throws 
RyaDAOException {
+        try {
+            //TODO: Should have a lock here in case we are adding and 
committing at the same time
+            while (commitStatements.hasNext()) {
+
+                Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = 
ryaTableMutationsFactory.serialize(commitStatements.next());
+                Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO);
+                Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO);
+                Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP);
+                bw_spo.addMutations(spo);
+                bw_po.addMutations(po);
+                bw_osp.addMutations(osp);
+            }
+
+            bw_spo.flush();
+            bw_po.flush();
+            bw_osp.flush();
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        }
+    }
+
+    @Override
+    public void destroy() throws RyaDAOException {
+        if (!initialized) {
+            return;
+        }
+        //TODO: write lock
+        try {
+            initialized = false;
+            bw_osp.flush();
+            bw_spo.flush();
+            bw_po.flush();
+            bw_ns.flush();
+
+            bw_osp.close();
+            bw_spo.close();
+            bw_po.close();
+            bw_ns.close();
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        }
+    }
+
+    @Override
+    public void addNamespace(String pfx, String namespace) throws 
RyaDAOException {
+        try {
+            Mutation m = new Mutation(new Text(pfx));
+            m.put(INFO_NAMESPACE_TXT, EMPTY_TEXT, new 
cloudbase.core.data.Value(
+                    namespace.getBytes()));
+            bw_ns.addMutation(m);
+            bw_ns.flush();
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        }
+    }
+
+    @Override
+    public String getNamespace(String pfx) throws RyaDAOException {
+        try {
+            Scanner scanner = 
connector.createScanner(tableLayoutStrategy.getNs(),
+                    ALL_AUTHORIZATIONS);
+            scanner.fetchColumn(INFO_NAMESPACE_TXT, EMPTY_TEXT);
+            scanner.setRange(new Range(new Text(pfx)));
+            Iterator<Map.Entry<Key, cloudbase.core.data.Value>> iterator = 
scanner
+                    .iterator();
+
+            if (iterator.hasNext()) {
+                return new String(iterator.next().getValue().get());
+            }
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        }
+        return null;
+    }
+
+    @Override
+    public void removeNamespace(String pfx) throws RyaDAOException {
+        try {
+            Mutation del = new Mutation(new Text(pfx));
+            del.putDelete(INFO_NAMESPACE_TXT, EMPTY_TEXT);
+            bw_ns.addMutation(del);
+            bw_ns.flush();
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        }
+    }
+
+    @Override
+    public CloseableIteration<Namespace, RyaDAOException> iterateNamespace() 
throws RyaDAOException {
+        try {
+            Scanner scanner = 
connector.createScanner(tableLayoutStrategy.getNs(),
+                    ALL_AUTHORIZATIONS);
+            scanner.fetchColumnFamily(INFO_NAMESPACE_TXT);
+            Iterator<Map.Entry<Key, cloudbase.core.data.Value>> result = 
scanner.iterator();
+            return new CloudbaseNamespaceTableIterator(result);
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        }
+    }
+
+    @Override
+    public RyaNamespaceManager<CloudbaseRdfConfiguration> 
getNamespaceManager() {
+        return this;
+    }
+
+    @Override
+    public void purge(RdfCloudTripleStoreConfiguration configuration) {
+        for (String tableName : getTables()) {
+            try {
+                purge(tableName, configuration.getAuths());
+                compact(tableName);
+            } catch (TableNotFoundException e) {
+                logger.error(e.getMessage());
+            } catch (MutationsRejectedException e) {
+                logger.error(e.getMessage());
+            }
+        }
+        try {
+            if (isInitialized()) {
+                checkVersion();
+            }
+        } catch (RyaDAOException e) {
+            logger.error("checkVersion() failed?", e);
+        }
+    }
+
+    @Override
+    public void dropAndDestroy() throws RyaDAOException {
+        for (String tableName : getTables()) {
+            try {
+                drop(tableName);
+            } catch (CBSecurityException e) {
+                logger.error(e.getMessage());
+                throw new RyaDAOException(e);
+            } catch (CBException e) {
+                logger.error(e.getMessage());
+                throw new RyaDAOException(e);
+            } catch (TableNotFoundException e) {
+                logger.warn(e.getMessage());
+            }
+        }
+        destroy();
+    }
+
+    public Connector getConnector() {
+        return connector;
+    }
+
+    public void setConnector(Connector connector) {
+        this.connector = connector;
+    }
+
+    public CloudbaseRdfConfiguration getConf() {
+        return conf;
+    }
+
+    public void setConf(CloudbaseRdfConfiguration conf) {
+        this.conf = conf;
+    }
+
+    public RyaTableMutationsFactory getRyaTableMutationsFactory() {
+        return ryaTableMutationsFactory;
+    }
+
+    public void setRyaTableMutationsFactory(RyaTableMutationsFactory 
ryaTableMutationsFactory) {
+        this.ryaTableMutationsFactory = ryaTableMutationsFactory;
+    }
+
+    public CloudbaseRyaQueryEngine getQueryEngine() {
+        return queryEngine;
+    }
+
+    public void setQueryEngine(CloudbaseRyaQueryEngine queryEngine) {
+        this.queryEngine = queryEngine;
+    }
+
+    protected String[] getTables() {
+        return new String[] {
+                tableLayoutStrategy.getSpo()
+                , tableLayoutStrategy.getPo()
+                , tableLayoutStrategy.getOsp()
+                , tableLayoutStrategy.getNs()
+                , tableLayoutStrategy.getEval()
+        };
+    }
+
+    private void purge(String tableName, String[] auths) throws 
TableNotFoundException, MutationsRejectedException {
+        if (tableExists(tableName)) {
+            logger.info("Purging cloudbase table: " + tableName);
+            BatchDeleter batchDeleter = createBatchDeleter(tableName, new 
Authorizations(auths));
+            try {
+                batchDeleter.setRanges(Collections.singleton(new Range()));
+                batchDeleter.delete();
+            } finally {
+                ((TabletServerBatchDeleter)batchDeleter).close();
+            }
+        }
+    }
+
+    private void compact(String tableName) {
+        Date now = new Date(System.currentTimeMillis());
+        SimpleDateFormat dateParser = new SimpleDateFormat("yyyyMMddHHmmssz", 
Locale.getDefault());
+        String nowStr = dateParser.format(now);
+        try {
+            for (Map.Entry<String, String> prop : 
connector.tableOperations().getProperties(tableName)) {
+                if 
(prop.getKey().equals(Property.TABLE_MAJC_COMPACTALL_AT.getKey())) {
+                    if (dateParser.parse(prop.getValue()).after(now)) {
+                        return;
+                    } else {
+                        break;
+                    }
+                }
+            }
+
+            connector.tableOperations().flush(tableName);
+            logger.info("Requesting major compaction for table " + tableName);
+            connector.tableOperations().setProperty(tableName, 
Property.TABLE_MAJC_COMPACTALL_AT.getKey(), nowStr);
+        } catch (Exception e) {
+            logger.error(e.getMessage());
+        }
+    }
+
+    private Authorizations getAuthorizations(String auth) {
+        if (auth == null) {
+            return new Authorizations();
+        } else {
+            String[] auths = auth.split(",");
+            return new Authorizations(auths);
+        }
+    }
+
+    private boolean tableExists(String tableName) {
+        return getConnector().tableOperations().exists(tableName);
+    }
+
+    private BatchDeleter createBatchDeleter(String tableName, Authorizations 
authorizations) throws TableNotFoundException {
+        return connector.createBatchDeleter(tableName, authorizations, 
NUM_THREADS, MAX_MEMORY, MAX_TIME, NUM_THREADS);
+    }
+
+    private void checkVersion() throws RyaDAOException {
+        String version = getVersion();
+        if (version == null) {
+            this.add(getVersionRyaStatement());
+        }
+        //TODO: Do a version check here
+    }
+
+    protected RyaStatement getVersionRyaStatement() {
+        return new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, 
VERSION_RYA);
+    }
+
+    private void drop(String tableName) throws CBSecurityException, 
CBException, TableNotFoundException {
+        logger.info("Dropping cloudbase table: " + tableName);
+        connector.tableOperations().delete(tableName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/RyaTableKeyValues.java
----------------------------------------------------------------------
diff --git 
a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/RyaTableKeyValues.java 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/RyaTableKeyValues.java
new file mode 100644
index 0000000..8869759
--- /dev/null
+++ b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/RyaTableKeyValues.java
@@ -0,0 +1,93 @@
+package mvm.rya.cloudbase;
+
+import cloudbase.core.data.Key;
+import cloudbase.core.data.Value;
+import cloudbase.core.security.ColumnVisibility;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaType;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.resolver.RyaContext;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+import org.apache.hadoop.io.Text;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+import static java.util.AbstractMap.SimpleEntry;
+import static mvm.rya.cloudbase.CloudbaseRdfConstants.EMPTY_CV;
+import static mvm.rya.cloudbase.CloudbaseRdfConstants.EMPTY_VALUE;
+
+public class RyaTableKeyValues {
+    public static final ColumnVisibility EMPTY_CV = new ColumnVisibility();
+    public static final Text EMPTY_CV_TEXT = new 
Text(EMPTY_CV.getExpression());
+
+    RyaContext instance = RyaContext.getInstance();
+
+    private RyaStatement stmt;
+    private Collection<Map.Entry<Key, Value>> spo = new 
ArrayList<Map.Entry<Key, Value>>();
+    private Collection<Map.Entry<Key, Value>> po = new 
ArrayList<Map.Entry<Key, Value>>();
+    private Collection<Map.Entry<Key, Value>> osp = new 
ArrayList<Map.Entry<Key, Value>>();
+
+    public RyaTableKeyValues(RyaStatement stmt) {
+        this.stmt = stmt;
+    }
+
+    public Collection<Map.Entry<Key, Value>> getSpo() {
+        return spo;
+    }
+
+    public Collection<Map.Entry<Key, Value>> getPo() {
+        return po;
+    }
+
+    public Collection<Map.Entry<Key, Value>> getOsp() {
+        return osp;
+    }
+
+    public RyaTableKeyValues invoke() throws IOException {
+        /**
+         * TODO: If there are contexts, do we still replicate the information 
into the default graph as well
+         * as the named graphs?
+         */try {
+            Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, 
mvm.rya.api.resolver.triple.TripleRow> rowMap = instance.serializeTriple(stmt);
+            TripleRow tripleRow = 
rowMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO);
+            byte[] columnVisibility = tripleRow.getColumnVisibility();
+            Text cv = columnVisibility == null ? EMPTY_CV_TEXT : new 
Text(columnVisibility);
+            Long timestamp = tripleRow.getTimestamp();
+            timestamp = timestamp == null ? 0l : timestamp;
+            byte[] value = tripleRow.getValue();
+            Value v = value == null ? EMPTY_VALUE : new Value(value);
+            spo.add(new SimpleEntry(new Key(new Text(tripleRow.getRow()),
+                    new Text(tripleRow.getColumnFamily()),
+                    new Text(tripleRow.getColumnQualifier()),
+                    cv, timestamp), v));
+            tripleRow = 
rowMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO);
+            po.add(new SimpleEntry(new Key(new Text(tripleRow.getRow()),
+                    new Text(tripleRow.getColumnFamily()),
+                    new Text(tripleRow.getColumnQualifier()),
+                    cv, timestamp), v));
+            tripleRow = 
rowMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP);
+            osp.add(new SimpleEntry(new Key(new Text(tripleRow.getRow()),
+                    new Text(tripleRow.getColumnFamily()),
+                    new Text(tripleRow.getColumnQualifier()),
+                    cv, timestamp), v));
+        } catch (TripleRowResolverException e) {
+            throw new IOException(e);
+        }
+        return this;
+    }
+
+    @Override
+    public String toString() {
+        return "RyaTableKeyValues{" +
+                "statement=" + stmt +
+                ", spo=" + spo +
+                ", po=" + po +
+                ", o=" + osp +
+                '}';
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/RyaTableMutationsFactory.java
----------------------------------------------------------------------
diff --git 
a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/RyaTableMutationsFactory.java
 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/RyaTableMutationsFactory.java
new file mode 100644
index 0000000..ab9b37d
--- /dev/null
+++ 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/RyaTableMutationsFactory.java
@@ -0,0 +1,81 @@
+package mvm.rya.cloudbase;
+
+import cloudbase.core.data.Mutation;
+import cloudbase.core.data.Value;
+import cloudbase.core.security.ColumnVisibility;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaType;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.resolver.RyaContext;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+import org.apache.hadoop.io.Text;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import static mvm.rya.cloudbase.CloudbaseRdfConstants.EMPTY_CV;
+import static mvm.rya.cloudbase.CloudbaseRdfConstants.EMPTY_VALUE;
+
+public class RyaTableMutationsFactory {
+
+    RyaContext ryaContext = RyaContext.getInstance();
+
+    public RyaTableMutationsFactory() {
+    }
+
+    //TODO: Does this still need to be collections
+    public Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, 
Collection<Mutation>> serialize(
+            RyaStatement stmt) throws IOException {
+
+        Collection<Mutation> spo_muts = new ArrayList<Mutation>();
+        Collection<Mutation> po_muts = new ArrayList<Mutation>();
+        Collection<Mutation> osp_muts = new ArrayList<Mutation>();
+        /**
+         * TODO: If there are contexts, do we still replicate the information 
into the default graph as well
+         * as the named graphs?
+         */
+        try {
+            Map<TABLE_LAYOUT, TripleRow> rowMap = 
ryaContext.serializeTriple(stmt);
+            TripleRow tripleRow = rowMap.get(TABLE_LAYOUT.SPO);
+            spo_muts.add(createMutation(tripleRow));
+            tripleRow = rowMap.get(TABLE_LAYOUT.PO);
+            po_muts.add(createMutation(tripleRow));
+            tripleRow = rowMap.get(TABLE_LAYOUT.OSP);
+            osp_muts.add(createMutation(tripleRow));
+        } catch (TripleRowResolverException fe) {
+            throw new IOException(fe);
+        }
+
+        Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> 
mutations =
+                new HashMap<RdfCloudTripleStoreConstants.TABLE_LAYOUT, 
Collection<Mutation>>();
+        mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO, spo_muts);
+        mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO, po_muts);
+        mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP, osp_muts);
+
+        return mutations;
+    }
+
+    protected Mutation createMutation(TripleRow tripleRow) {
+        Mutation mutation = new Mutation(new Text(tripleRow.getRow()));
+        byte[] columnVisibility = tripleRow.getColumnVisibility();
+        ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new 
ColumnVisibility(columnVisibility);
+        Long timestamp = tripleRow.getTimestamp();
+        timestamp = timestamp == null ? 0l : timestamp;
+        byte[] value = tripleRow.getValue();
+        Value v = value == null ? EMPTY_VALUE : new Value(value);
+        byte[] columnQualifier = tripleRow.getColumnQualifier();
+        Text cqText = columnQualifier == null ? EMPTY_TEXT : new 
Text(columnQualifier);
+        byte[] columnFamily = tripleRow.getColumnFamily();
+        Text cfText = columnFamily == null ? EMPTY_TEXT : new 
Text(columnFamily);
+
+        mutation.put(cfText,cqText, cv, timestamp, v);
+        return mutation;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/eval/CloudbaseRdfCountTool.java
----------------------------------------------------------------------
diff --git 
a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/eval/CloudbaseRdfCountTool.java
 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/eval/CloudbaseRdfCountTool.java
new file mode 100644
index 0000000..5c6e8cf
--- /dev/null
+++ 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/eval/CloudbaseRdfCountTool.java
@@ -0,0 +1,350 @@
+package mvm.rya.cloudbase.mr.eval;
+
+import cloudbase.core.CBConstants;
+import cloudbase.core.client.mapreduce.CloudbaseInputFormat;
+import cloudbase.core.client.mapreduce.CloudbaseOutputFormat;
+import cloudbase.core.data.Key;
+import cloudbase.core.data.Mutation;
+import cloudbase.core.data.Range;
+import cloudbase.core.data.Value;
+import cloudbase.core.iterators.FilteringIterator;
+import cloudbase.core.iterators.filter.AgeOffFilter;
+import cloudbase.core.security.Authorizations;
+import cloudbase.core.security.ColumnVisibility;
+import cloudbase.core.util.Pair;
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.RdfCloudTripleStoreUtils;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.resolver.RyaContext;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+import mvm.rya.cloudbase.CloudbaseRdfConstants;
+import mvm.rya.cloudbase.mr.utils.MRUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+
+/**
+ * Count subject, predicate, object. Save in table
+ * Class RdfCloudTripleStoreCountTool
+ * Date: Apr 12, 2011
+ * Time: 10:39:40 AM
+ */
+public class CloudbaseRdfCountTool implements Tool {
+
+    public static final String TTL_PROP = "mvm.rya.cloudbase.sail.mr.eval.ttl";
+
+    private Configuration conf;
+
+    public static void main(String[] args) {
+        try {
+
+            ToolRunner.run(new Configuration(), new CloudbaseRdfCountTool(), 
args);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * cloudbase props
+     */
+    private RdfCloudTripleStoreConstants.TABLE_LAYOUT rdfTableLayout = 
RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP;
+    private String userName = "root";
+    private String pwd = "password";
+    private String instance = "stratus";
+    private String zk = "10.40.190.113:2181";
+    private Authorizations authorizations = CBConstants.NO_AUTHS;
+    private String ttl = null;
+
+    @Override
+    public int run(String[] strings) throws Exception {
+        conf.set(MRUtils.JOB_NAME_PROP, "Gather Evaluation Statistics");
+
+        //conf
+        zk = conf.get(MRUtils.CB_ZK_PROP, zk);
+        ttl = conf.get(MRUtils.CB_TTL_PROP, ttl);
+        instance = conf.get(MRUtils.CB_INSTANCE_PROP, instance);
+        userName = conf.get(MRUtils.CB_USERNAME_PROP, userName);
+        pwd = conf.get(MRUtils.CB_PWD_PROP, pwd);
+        boolean mock = conf.getBoolean(MRUtils.CB_MOCK_PROP, false);
+        String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null);
+        if (tablePrefix != null)
+            RdfCloudTripleStoreConstants.prefixTables(tablePrefix);
+        rdfTableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.valueOf(
+                conf.get(MRUtils.TABLE_LAYOUT_PROP, 
RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP.toString()));
+
+        String auth = conf.get(MRUtils.CB_AUTH_PROP);
+        if (auth != null)
+            authorizations = new Authorizations(auth.split(","));
+
+        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
+        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
+        conf.set("io.sort.mb", "256");
+        Job job = new Job(conf);
+        job.setJarByClass(CloudbaseRdfCountTool.class);
+
+        //set ttl
+        ttl = conf.get(TTL_PROP);
+
+        // set up cloudbase input
+        job.setInputFormatClass(CloudbaseInputFormat.class);
+        CloudbaseInputFormat.setInputInfo(job, userName, pwd.getBytes(),
+                RdfCloudTripleStoreUtils.layoutPrefixToTable(rdfTableLayout, 
tablePrefix), authorizations);
+        CloudbaseInputFormat.setZooKeeperInstance(job, instance, zk);
+        Collection<Pair<Text, Text>> columns = new ArrayList<Pair<Text, 
Text>>();
+        //TODO: What about named graphs/contexts here?
+//        final Pair pair = new Pair(RdfCloudTripleStoreConstants.INFO_TXT, 
RdfCloudTripleStoreConstants.INFO_TXT);
+//        columns.add(pair);
+//        CloudbaseInputFormat.fetchColumns(job, columns);
+        if (ttl != null) {
+            CloudbaseInputFormat.setIterator(job, 1, 
FilteringIterator.class.getName(), "filteringIterator");
+            CloudbaseInputFormat.setIteratorOption(job, "filteringIterator", 
"0", AgeOffFilter.class.getName());
+            CloudbaseInputFormat.setIteratorOption(job, "filteringIterator", 
"0.ttl", ttl);
+        }
+
+        CloudbaseInputFormat.setRanges(job, Lists.newArrayList(new Range(new 
Text(new byte[]{}), new Text(new byte[]{Byte.MAX_VALUE}))));
+
+        // set input output of the particular job
+        job.setMapOutputKeyClass(Text.class);
+        job.setMapOutputValueClass(LongWritable.class);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(Mutation.class);
+
+        // set mapper and reducer classes
+        job.setMapperClass(CountPiecesMapper.class);
+        job.setCombinerClass(CountPiecesCombiner.class);
+        job.setReducerClass(CountPiecesReducer.class);
+
+        CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), 
true, tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX);
+        CloudbaseOutputFormat.setZooKeeperInstance(job, instance, zk);
+        job.setOutputFormatClass(CloudbaseOutputFormat.class);
+
+        // Submit the job
+        Date startTime = new Date();
+        System.out.println("Job started: " + startTime);
+        int exitCode = job.waitForCompletion(true) ? 0 : 1;
+
+        if (exitCode == 0) {
+            Date end_time = new Date();
+            System.out.println("Job ended: " + end_time);
+            System.out.println("The job took "
+                    + (end_time.getTime() - startTime.getTime()) / 1000
+                    + " seconds.");
+            return 0;
+        } else {
+            System.out.println("Job Failed!!!");
+        }
+
+        return -1;
+    }
+
+    @Override
+    public void setConf(Configuration configuration) {
+        this.conf = configuration;
+    }
+
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
+
+    public String getInstance() {
+        return instance;
+    }
+
+    public void setInstance(String instance) {
+        this.instance = instance;
+    }
+
+    public String getPwd() {
+        return pwd;
+    }
+
+    public void setPwd(String pwd) {
+        this.pwd = pwd;
+    }
+
+    public String getZk() {
+        return zk;
+    }
+
+    public void setZk(String zk) {
+        this.zk = zk;
+    }
+
+    public String getTtl() {
+        return ttl;
+    }
+
+    public void setTtl(String ttl) {
+        this.ttl = ttl;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    public void setUserName(String userName) {
+        this.userName = userName;
+    }
+
+    public static class CountPiecesMapper extends Mapper<Key, Value, Text, 
LongWritable> {
+
+        public static final byte[] EMPTY_BYTES = new byte[0];
+        private RdfCloudTripleStoreConstants.TABLE_LAYOUT tableLayout = 
RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP;
+
+        ValueFactoryImpl vf = new ValueFactoryImpl();
+
+        private Text keyOut = new Text();
+        private LongWritable valOut = new LongWritable(1);
+        private RyaContext ryaContext = RyaContext.getInstance();
+
+        @Override
+        protected void setup(Context context) throws IOException, 
InterruptedException {
+            super.setup(context);
+            Configuration conf = context.getConfiguration();
+            tableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.valueOf(
+                    conf.get(MRUtils.TABLE_LAYOUT_PROP, 
RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP.toString()));
+        }
+
+        @Override
+        protected void map(Key key, Value value, Context context) throws 
IOException, InterruptedException {
+            try {
+                RyaStatement statement = 
ryaContext.deserializeTriple(tableLayout, new 
TripleRow(key.getRow().getBytes(), key.getColumnFamily().getBytes(), 
key.getColumnQualifier().getBytes()));
+                //count each piece subject, pred, object
+
+                String subj = statement.getSubject().getData();
+                String pred = statement.getPredicate().getData();
+//                byte[] objBytes = 
tripleFormat.getValueFormat().serialize(statement.getObject());
+                RyaURI scontext = statement.getContext();
+                boolean includesContext = scontext != null;
+                String scontext_str = (includesContext) ? scontext.getData() : 
null;
+
+                ByteArrayDataOutput output = ByteStreams.newDataOutput();
+                output.writeUTF(subj);
+                output.writeUTF(RdfCloudTripleStoreConstants.SUBJECT_CF);
+                output.writeBoolean(includesContext);
+                if (includesContext)
+                    output.writeUTF(scontext_str);
+                keyOut.set(output.toByteArray());
+                context.write(keyOut, valOut);
+
+                output = ByteStreams.newDataOutput();
+                output.writeUTF(pred);
+                output.writeUTF(RdfCloudTripleStoreConstants.PRED_CF);
+                output.writeBoolean(includesContext);
+                if (includesContext)
+                    output.writeUTF(scontext_str);
+                keyOut.set(output.toByteArray());
+                context.write(keyOut, valOut);
+
+
+                //TODO: Obj in eval stats table?
+//                output = ByteStreams.newDataOutput();
+//                output.write(objBytes);
+//                output.writeByte(RdfCloudTripleStoreConstants.DELIM_BYTE);
+//                output.writeUTF(RdfCloudTripleStoreConstants.OBJ_CF);
+//                output.writeBoolean(includesContext);
+//                if (includesContext)
+//                    output.write(scontext_bytes);
+//                keyOut.set(output.toByteArray());
+//                context.write(keyOut, valOut);
+            } catch (TripleRowResolverException e) {
+                throw new IOException(e);
+            }
+        }
+    }
+
+    public static class CountPiecesCombiner extends Reducer<Text, 
LongWritable, Text, LongWritable> {
+
+        private LongWritable valOut = new LongWritable();
+
+        // TODO: can still add up to be larger I guess
+        // any count lower than this does not need to be saved
+        public static final int TOO_LOW = 2;
+
+        @Override
+        protected void reduce(Text key, Iterable<LongWritable> values, Context 
context) throws IOException, InterruptedException {
+            long count = 0;
+            for (LongWritable lw : values) {
+                count += lw.get();
+            }
+
+            if (count <= TOO_LOW)
+                return;
+
+            valOut.set(count);
+            context.write(key, valOut);
+        }
+
+    }
+
+    public static class CountPiecesReducer extends Reducer<Text, LongWritable, 
Text, Mutation> {
+
+        Text row = new Text();
+        Text cat_txt = new Text();
+        Value v_out = new Value();
+        ValueFactory vf = new ValueFactoryImpl();
+
+        // any count lower than this does not need to be saved
+        public static final int TOO_LOW = 10;
+        private String tablePrefix;
+        protected Text table;
+        private ColumnVisibility cv = CloudbaseRdfConstants.EMPTY_CV;
+
+        @Override
+        protected void setup(Context context) throws IOException, 
InterruptedException {
+            super.setup(context);
+            tablePrefix = 
context.getConfiguration().get(MRUtils.TABLE_PREFIX_PROPERTY, 
RdfCloudTripleStoreConstants.TBL_PRFX_DEF);
+            table = new Text(tablePrefix + 
RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX);
+            final String cv_s = 
context.getConfiguration().get(MRUtils.CB_CV_PROP);
+            if (cv_s != null)
+                cv = new ColumnVisibility(cv_s);
+        }
+
+        @Override
+        protected void reduce(Text key, Iterable<LongWritable> values, Context 
context) throws IOException, InterruptedException {
+            long count = 0;
+            for (LongWritable lw : values) {
+                count += lw.get();
+            }
+
+            if (count <= TOO_LOW)
+                return;
+
+            ByteArrayDataInput badi = ByteStreams.newDataInput(key.getBytes());
+            String v = badi.readUTF();
+            cat_txt.set(badi.readUTF());
+
+            Text columnQualifier = RdfCloudTripleStoreConstants.EMPTY_TEXT;
+            boolean includesContext = badi.readBoolean();
+            if (includesContext) {
+                columnQualifier = new Text(badi.readUTF());
+            }
+
+            row.set(v);
+            Mutation m = new Mutation(row);
+            v_out.set((count + "").getBytes());
+            m.put(cat_txt, columnQualifier, cv, v_out);
+            context.write(table, m);
+        }
+
+    }
+}
\ No newline at end of file

Reply via email to