http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBRyaDAO.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBRyaDAO.java 
b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBRyaDAO.java
new file mode 100644
index 0000000..d3f9ee5
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBRyaDAO.java
@@ -0,0 +1,202 @@
+package mvm.rya.mongodb;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.persist.RyaDAO;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.api.persist.RyaNamespaceManager;
+import mvm.rya.api.persist.index.RyaSecondaryIndexer;
+import mvm.rya.api.persist.query.RyaQueryEngine;
+import mvm.rya.mongodb.dao.MongoDBNamespaceManager;
+import mvm.rya.mongodb.dao.MongoDBStorageStrategy;
+import mvm.rya.mongodb.dao.SimpleMongoDBNamespaceManager;
+import mvm.rya.mongodb.dao.SimpleMongoDBStorageStrategy;
+
+import org.apache.commons.io.IOUtils;
+
+import com.mongodb.DB;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+import com.mongodb.InsertOptions;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoCredential;
+import com.mongodb.ServerAddress;
+
+import de.flapdoodle.embed.mongo.distribution.Version;
+import de.flapdoodle.embed.mongo.tests.MongodForTestsFactory;
+
+public class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{
+
+       
+       private MongoDBRdfConfiguration conf;
+       private MongoClient mongoClient;
+       private DB db;
+       private DBCollection coll;
+       private MongoDBQueryEngine queryEngine;
+       private MongoDBStorageStrategy storageStrategy;
+       private MongoDBNamespaceManager nameSpaceManager;
+       private MongodForTestsFactory testsFactory;
+       
+       private List<RyaSecondaryIndexer> secondaryIndexers;
+       
+       public MongoDBRyaDAO(MongoDBRdfConfiguration conf) throws 
RyaDAOException{
+               this.conf = conf;
+               init();
+       }
+
+       public void setConf(MongoDBRdfConfiguration conf) {
+               this.conf = conf;
+       }
+
+    public MongoDBRdfConfiguration getConf() {
+        return conf;
+    }
+
+    public void init() throws RyaDAOException {
+        try {
+            boolean useMongoTest = conf.getUseTestMongo();
+            if (useMongoTest) {
+                testsFactory = 
MongodForTestsFactory.with(Version.Main.PRODUCTION);
+                mongoClient = testsFactory.newMongo();
+                int port = mongoClient.getServerAddressList().get(0).getPort();
+                conf.set(MongoDBRdfConfiguration.MONGO_INSTANCE_PORT, 
Integer.toString(port));
+            } else {
+                ServerAddress server = new 
ServerAddress(conf.get(MongoDBRdfConfiguration.MONGO_INSTANCE),
+                        
Integer.valueOf(conf.get(MongoDBRdfConfiguration.MONGO_INSTANCE_PORT)));
+                if (conf.get(MongoDBRdfConfiguration.MONGO_USER) != null) {
+                    MongoCredential cred = MongoCredential.createCredential(
+                            conf.get(MongoDBRdfConfiguration.MONGO_USER),
+                            
conf.get(MongoDBRdfConfiguration.MONGO_USER_PASSWORD),
+                            
conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME).toCharArray());
+                    mongoClient = new MongoClient(server, Arrays.asList(cred));
+                } else {
+                    mongoClient = new MongoClient(server);
+                }
+            }
+            secondaryIndexers = conf.getAdditionalIndexers();
+            for(RyaSecondaryIndexer index: secondaryIndexers) {
+                index.setConf(conf);
+            }
+            
+            db = 
mongoClient.getDB(conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME));
+            coll = db.getCollection(conf.getTriplesCollectionName());
+            nameSpaceManager = new 
SimpleMongoDBNamespaceManager(db.getCollection(conf.getNameSpacesCollectionName()));
+            queryEngine = new MongoDBQueryEngine(conf);
+            storageStrategy = new SimpleMongoDBStorageStrategy();
+            storageStrategy.createIndices(coll);
+
+        } catch (UnknownHostException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        } catch (IOException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+
+    }
+
+    public boolean isInitialized() throws RyaDAOException {
+        return true;
+    }
+
+    public void destroy() throws RyaDAOException {
+        if (mongoClient != null) {
+            mongoClient.close();
+        }
+        if (conf.getUseTestMongo()) {
+            testsFactory.shutdown();
+        }
+
+        IOUtils.closeQuietly(queryEngine);
+    }
+
+       public void add(RyaStatement statement) throws RyaDAOException {        
        
+               // add it to the collection
+               try {
+                       coll.insert(storageStrategy.serialize(statement));
+                       for(RyaSecondaryIndexer index: secondaryIndexers) {
+                           index.storeStatement(statement);
+                       }
+               }
+               catch (com.mongodb.MongoException.DuplicateKey exception){
+                       // ignore
+               }
+               catch (com.mongodb.DuplicateKeyException exception){
+                       // ignore
+               }
+               catch (Exception ex){
+                       // ignore single exceptions
+                       ex.printStackTrace();
+               }
+       }
+       
+       public void add(Iterator<RyaStatement> statement) throws 
RyaDAOException {
+               List<DBObject> dbInserts = new ArrayList<DBObject>();
+               while (statement.hasNext()){
+                       RyaStatement ryaStatement = statement.next();
+                       DBObject insert = 
storageStrategy.serialize(ryaStatement);
+                       dbInserts.add(insert);
+                       
+            try {
+                for (RyaSecondaryIndexer index : secondaryIndexers) {
+                    index.storeStatement(ryaStatement);
+                }
+            } catch (IOException e) {
+                throw new RyaDAOException(e);
+            }
+            
+               }
+               coll.insert(dbInserts, new 
InsertOptions().continueOnError(true));
+       }
+
+       public void delete(RyaStatement statement, MongoDBRdfConfiguration conf)
+                       throws RyaDAOException {
+               DBObject obj = storageStrategy.serialize(statement);
+               coll.remove(obj);
+       }
+
+       public void dropGraph(MongoDBRdfConfiguration conf, RyaURI... graphs)
+                       throws RyaDAOException {
+               
+       }
+
+       public void delete(Iterator<RyaStatement> statements,
+                       MongoDBRdfConfiguration conf) throws RyaDAOException {
+               while (statements.hasNext()){
+                       RyaStatement ryaStatement = statements.next();
+                       coll.remove(storageStrategy.serialize(ryaStatement));
+               }
+               
+       }
+
+       public String getVersion() throws RyaDAOException {
+               return "1.0";
+       }
+
+       public RyaQueryEngine<MongoDBRdfConfiguration> getQueryEngine() {
+               return queryEngine;
+       }
+
+       public RyaNamespaceManager<MongoDBRdfConfiguration> 
getNamespaceManager() {
+               return nameSpaceManager;
+       }
+
+       public void purge(RdfCloudTripleStoreConfiguration configuration) {
+               // TODO Auto-generated method stub
+               
+       }
+
+       public void dropAndDestroy() throws RyaDAOException {
+               db.dropDatabase(); // this is dangerous!
+       }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/MongoDBNamespaceManager.java
----------------------------------------------------------------------
diff --git 
a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/MongoDBNamespaceManager.java
 
b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/MongoDBNamespaceManager.java
new file mode 100644
index 0000000..270b57f
--- /dev/null
+++ 
b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/MongoDBNamespaceManager.java
@@ -0,0 +1,15 @@
+package mvm.rya.mongodb.dao;
+
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.persist.RyaNamespaceManager;
+import mvm.rya.api.persist.query.RyaQuery;
+import mvm.rya.mongodb.MongoDBRdfConfiguration;
+
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+
+public interface MongoDBNamespaceManager extends 
RyaNamespaceManager<MongoDBRdfConfiguration>{
+
+       public void createIndices(DBCollection coll);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/MongoDBStorageStrategy.java
----------------------------------------------------------------------
diff --git 
a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/MongoDBStorageStrategy.java 
b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/MongoDBStorageStrategy.java
new file mode 100644
index 0000000..093f2dd
--- /dev/null
+++ 
b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/MongoDBStorageStrategy.java
@@ -0,0 +1,21 @@
+package mvm.rya.mongodb.dao;
+
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.persist.query.RyaQuery;
+
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+
+public interface MongoDBStorageStrategy {
+
+       public DBObject getQuery(RyaStatement stmt);
+
+       public RyaStatement deserializeDBObject(DBObject queryResult);
+
+       public DBObject serialize(RyaStatement statement);
+
+       public DBObject getQuery(RyaQuery ryaQuery);
+
+       public void createIndices(DBCollection coll);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/SimpleMongoDBNamespaceManager.java
----------------------------------------------------------------------
diff --git 
a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/SimpleMongoDBNamespaceManager.java
 
b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/SimpleMongoDBNamespaceManager.java
new file mode 100644
index 0000000..1847b94
--- /dev/null
+++ 
b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/SimpleMongoDBNamespaceManager.java
@@ -0,0 +1,169 @@
+package mvm.rya.mongodb.dao;
+
+import info.aduna.iteration.CloseableIteration;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaType;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.api.persist.RyaNamespaceManager;
+import mvm.rya.api.persist.query.RyaQuery;
+import mvm.rya.mongodb.MongoDBRdfConfiguration;
+
+import org.apache.commons.codec.binary.Hex;
+import org.openrdf.model.Namespace;
+import org.openrdf.model.impl.ValueFactoryImpl;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBCollection;
+import com.mongodb.DBCursor;
+import com.mongodb.DBObject;
+
+public class SimpleMongoDBNamespaceManager implements MongoDBNamespaceManager {
+
+       public class NamespaceImplementation implements Namespace {
+
+               private String namespace;
+               private String prefix;
+
+               public NamespaceImplementation(String namespace, String prefix) 
{
+                       this.namespace = namespace;
+                       this.prefix = prefix;
+               }
+
+               @Override
+               public int compareTo(Namespace o) {
+                       if (!namespace.equalsIgnoreCase(o.getName())) return 
namespace.compareTo(o.getName());
+                       if (!prefix.equalsIgnoreCase(o.getPrefix())) return 
prefix.compareTo(o.getPrefix());
+                       return 0;
+               }
+
+               @Override
+               public String getName() {
+                       return namespace;
+               }
+
+               @Override
+               public String getPrefix() {
+                       return prefix;
+               }
+
+       }
+
+       public class MongoCursorIteration implements
+                       CloseableIteration<Namespace, RyaDAOException> {
+               private DBCursor cursor;
+
+               public MongoCursorIteration(DBCursor cursor2) {
+                       this.cursor = cursor2;
+               }
+
+               @Override
+               public boolean hasNext() throws RyaDAOException {
+                       return cursor.hasNext();
+               }
+
+               @Override
+               public Namespace next() throws RyaDAOException {
+                       DBObject ns = cursor.next();
+                       Map values = ns.toMap();
+                       String namespace = (String) values.get(NAMESPACE);
+                       String prefix = (String) values.get(PREFIX);
+                       
+                       Namespace temp =  new 
NamespaceImplementation(namespace, prefix);
+                       return temp;
+               }
+
+               @Override
+               public void remove() throws RyaDAOException {
+                       next();
+               }
+
+               @Override
+               public void close() throws RyaDAOException {
+                       cursor.close();
+               }
+
+       }
+
+       private static final String ID = "_id";
+       private static final String PREFIX = "prefix";
+       private static final String NAMESPACE = "namespace";
+       private MongoDBRdfConfiguration conf;
+       private DBCollection nsColl;
+
+
+       public SimpleMongoDBNamespaceManager(DBCollection nameSpaceCollection) {
+               nsColl = nameSpaceCollection;
+       }
+       
+       @Override
+       public void createIndices(DBCollection coll){
+               coll.createIndex(PREFIX);
+               coll.createIndex(NAMESPACE);
+       }
+
+
+       @Override
+       public void setConf(MongoDBRdfConfiguration paramC) {
+               this.conf = paramC;
+       }
+
+       @Override
+       public MongoDBRdfConfiguration getConf() {
+               // TODO Auto-generated method stub
+               return conf;
+       }
+
+       @Override
+       public void addNamespace(String prefix, String namespace)
+                       throws RyaDAOException {
+               String id = prefix;
+               byte[] bytes = id.getBytes();
+               try {
+                       MessageDigest digest = 
MessageDigest.getInstance("SHA-1");
+                       bytes = digest.digest(bytes);
+               } catch (NoSuchAlgorithmException e) {
+                       // TODO Auto-generated catch block
+                       e.printStackTrace();
+               }
+               BasicDBObject doc = new BasicDBObject(ID, new 
String(Hex.encodeHex(bytes)))
+               .append(PREFIX, prefix)
+           .append(NAMESPACE, namespace);
+               nsColl.insert(doc);
+               
+       }
+
+       @Override
+       public String getNamespace(String prefix) throws RyaDAOException {
+        DBObject query = new BasicDBObject().append(PREFIX, prefix);
+        DBCursor cursor = nsColl.find(query);
+        String nameSpace = prefix;
+        while (cursor.hasNext()){
+          DBObject obj = cursor.next();        
+          nameSpace = (String) obj.toMap().get(NAMESPACE);
+        }
+        return nameSpace;
+       }
+
+       @Override
+       public void removeNamespace(String prefix) throws RyaDAOException {
+        DBObject query = new BasicDBObject().append(PREFIX, prefix);
+               nsColl.remove(query);
+       }
+
+       @Override
+       public CloseableIteration<? extends Namespace, RyaDAOException> 
iterateNamespace()
+                       throws RyaDAOException {
+        DBObject query = new BasicDBObject();
+        DBCursor cursor = nsColl.find(query);
+               return new MongoCursorIteration(cursor);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java
----------------------------------------------------------------------
diff --git 
a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java
 
b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java
new file mode 100644
index 0000000..6de5b89
--- /dev/null
+++ 
b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java
@@ -0,0 +1,132 @@
+package mvm.rya.mongodb.dao;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Map;
+
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaType;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.persist.query.RyaQuery;
+
+import org.apache.commons.codec.binary.Hex;
+import org.openrdf.model.impl.ValueFactoryImpl;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+
+public class SimpleMongoDBStorageStrategy implements MongoDBStorageStrategy {
+
+       private static final String ID = "_id";
+       private static final String OBJECT_TYPE = "objectType";
+       private static final String CONTEXT = "context";
+       private static final String PREDICATE = "predicate";
+       private static final String OBJECT = "object";
+       private static final String SUBJECT = "subject";
+       private ValueFactoryImpl factory = new ValueFactoryImpl();
+
+
+       public SimpleMongoDBStorageStrategy() {
+       }
+       
+       @Override
+       public void createIndices(DBCollection coll){
+               coll.createIndex("subject");
+               coll.createIndex("predicate");
+               BasicDBObject doc = new BasicDBObject();
+           doc.put(SUBJECT, 1);
+           doc.put(PREDICATE, 1);
+               coll.createIndex(doc);
+               doc = new BasicDBObject(OBJECT, 1);
+               doc.put(OBJECT_TYPE, 1);
+               doc.put(PREDICATE, 1);
+               coll.createIndex(doc);
+               doc = new BasicDBObject(OBJECT, 1);
+               doc.put(OBJECT_TYPE, 1);
+               coll.createIndex(doc);
+               doc = new BasicDBObject(OBJECT, 1);
+               doc = new BasicDBObject(OBJECT_TYPE, 1);
+               doc.put(SUBJECT, 1);
+               coll.createIndex(doc);
+       }
+
+       @Override
+       public DBObject getQuery(RyaStatement stmt) {
+               RyaURI subject = stmt.getSubject();
+               RyaURI predicate = stmt.getPredicate();
+               RyaType object = stmt.getObject();
+               RyaURI context = stmt.getContext();
+               BasicDBObject query = new BasicDBObject();
+               if (subject != null){
+                       query.append(SUBJECT, subject.getData());
+               }
+               if (object != null){
+                       query.append(OBJECT, object.getData());
+                       query.append(OBJECT_TYPE, 
object.getDataType().toString());
+               }
+               if (predicate != null){
+                       query.append(PREDICATE, predicate.getData());
+               }
+               if (context != null){
+                       query.append(CONTEXT, context.getData());
+               }
+               
+               return query;
+       }
+
+       @Override
+       public RyaStatement deserializeDBObject(DBObject queryResult) {
+               Map result = queryResult.toMap();
+               String subject = (String) result.get(SUBJECT);
+               String object = (String) result.get(OBJECT);
+               String objectType = (String) result.get(OBJECT_TYPE);
+               String predicate = (String) result.get(PREDICATE);
+               String context = (String) result.get(CONTEXT);
+               RyaType objectRya = null;
+               if 
(objectType.equalsIgnoreCase("http://www.w3.org/2001/XMLSchema#anyURI";)){
+                       objectRya = new RyaURI(object);
+               }
+               else {
+                       objectRya = new RyaType(factory.createURI(objectType), 
object);
+               }
+               
+               if (!context.isEmpty()){
+                       return new RyaStatement(new RyaURI(subject), new 
RyaURI(predicate), objectRya,
+                                       new RyaURI(context));                   
+               }
+               return new RyaStatement(new RyaURI(subject), new 
RyaURI(predicate), objectRya);
+       }
+
+       @Override
+       public DBObject serialize(RyaStatement statement){
+               String context = "";
+               if (statement.getContext() != null){
+                       context = statement.getContext().getData();
+               }
+               String id = statement.getSubject().getData() + " " + 
+                               statement.getPredicate().getData() + " " +  
statement.getObject().getData() + " " + context;
+               byte[] bytes = id.getBytes();
+               try {
+                       MessageDigest digest = 
MessageDigest.getInstance("SHA-1");
+                       bytes = digest.digest(bytes);
+               } catch (NoSuchAlgorithmException e) {
+                       // TODO Auto-generated catch block
+                       e.printStackTrace();
+               }
+               BasicDBObject doc = new BasicDBObject(ID, new 
String(Hex.encodeHex(bytes)))
+               .append(SUBJECT, statement.getSubject().getData())
+           .append(PREDICATE, statement.getPredicate().getData())
+           .append(OBJECT, statement.getObject().getData())
+           .append(OBJECT_TYPE, statement.getObject().getDataType().toString())
+           .append(CONTEXT, context);
+               return doc;
+               
+       }
+
+       @Override
+       public DBObject getQuery(RyaQuery ryaQuery) {
+               return getQuery(ryaQuery.getQuery());
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/NonCloseableRyaStatementCursorIterator.java
----------------------------------------------------------------------
diff --git 
a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/NonCloseableRyaStatementCursorIterator.java
 
b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/NonCloseableRyaStatementCursorIterator.java
new file mode 100644
index 0000000..48f0931
--- /dev/null
+++ 
b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/NonCloseableRyaStatementCursorIterator.java
@@ -0,0 +1,37 @@
+package mvm.rya.mongodb.iter;
+
+import java.util.Iterator;
+
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.persist.RyaDAOException;
+
+public class NonCloseableRyaStatementCursorIterator implements 
Iterator<RyaStatement> {
+
+       RyaStatementCursorIterator iterator;
+       
+       @Override
+       public boolean hasNext() {
+               return iterator.hasNext();
+       }
+
+       @Override
+       public RyaStatement next() {
+               return iterator.next();
+       }
+
+       public NonCloseableRyaStatementCursorIterator(
+                       RyaStatementCursorIterator iterator) {
+               this.iterator = iterator;
+       }
+
+       @Override
+       public void remove() {
+               try {
+                       iterator.remove();
+               } catch (RyaDAOException e) {
+                       // TODO Auto-generated catch block
+                       e.printStackTrace();
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java
----------------------------------------------------------------------
diff --git 
a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java
 
b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java
new file mode 100644
index 0000000..b699d96
--- /dev/null
+++ 
b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java
@@ -0,0 +1,88 @@
+package mvm.rya.mongodb.iter;
+
+import info.aduna.iteration.CloseableIteration;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import mvm.rya.api.RdfCloudTripleStoreUtils;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.mongodb.dao.MongoDBStorageStrategy;
+
+import org.openrdf.query.BindingSet;
+
+import com.mongodb.DBCollection;
+import com.mongodb.DBCursor;
+import com.mongodb.DBObject;
+
+public class RyaStatementBindingSetCursorIterator implements 
CloseableIteration<Entry<RyaStatement, BindingSet>, RyaDAOException> {
+
+       private DBCollection coll;
+       private Map<DBObject, BindingSet> rangeMap;
+       private Iterator<DBObject> queryIterator;
+       private Long maxResults;
+       private DBCursor currentCursor;
+       private BindingSet currentBindingSet;
+       private MongoDBStorageStrategy strategy;
+
+       public RyaStatementBindingSetCursorIterator(DBCollection coll,
+                       Map<DBObject, BindingSet> rangeMap, 
MongoDBStorageStrategy strategy) {
+               this.coll = coll;
+               this.rangeMap = rangeMap;
+               this.queryIterator = rangeMap.keySet().iterator();
+               this.strategy = strategy;
+       }
+
+       @Override
+       public boolean hasNext() {
+               if (!currentCursorIsValid()) {
+                       findNextValidCursor();
+               }
+               return currentCursorIsValid();
+       }
+
+       @Override
+       public Entry<RyaStatement, BindingSet> next() {
+               if (!currentCursorIsValid()) {
+                       findNextValidCursor();
+               }
+               if (currentCursorIsValid()) {
+                       // convert to Rya Statement
+                       DBObject queryResult = currentCursor.next();
+                       RyaStatement statement = 
strategy.deserializeDBObject(queryResult);
+                       return new 
RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(statement, 
currentBindingSet);
+               }
+               return null;
+       }
+       
+       private void findNextValidCursor() {
+               while (queryIterator.hasNext()){
+                       DBObject currentQuery = queryIterator.next();
+                       currentCursor = coll.find(currentQuery);
+                       currentBindingSet = rangeMap.get(currentQuery);
+                       if (currentCursor.hasNext()) break;
+               }
+       }
+       
+       private boolean currentCursorIsValid() {
+               return (currentCursor != null) && currentCursor.hasNext();
+       }
+
+
+       public void setMaxResults(Long maxResults) {
+               this.maxResults = maxResults;
+       }
+
+       @Override
+       public void close() throws RyaDAOException {
+               // TODO don't know what to do here
+       }
+
+       @Override
+       public void remove() throws RyaDAOException {
+               next();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterable.java
----------------------------------------------------------------------
diff --git 
a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterable.java
 
b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterable.java
new file mode 100644
index 0000000..d69ab65
--- /dev/null
+++ 
b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterable.java
@@ -0,0 +1,47 @@
+package mvm.rya.mongodb.iter;
+
+import info.aduna.iteration.CloseableIteration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import mvm.rya.api.RdfCloudTripleStoreUtils;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.persist.RyaDAOException;
+
+import org.calrissian.mango.collect.CloseableIterable;
+import org.calrissian.mango.collect.CloseableIterator;
+import org.openrdf.query.BindingSet;
+
+import com.mongodb.DBCollection;
+import com.mongodb.DBCursor;
+import com.mongodb.DBObject;
+
+public class RyaStatementCursorIterable implements 
CloseableIterable<RyaStatement> {
+
+
+       private NonCloseableRyaStatementCursorIterator iterator;
+
+       public 
RyaStatementCursorIterable(NonCloseableRyaStatementCursorIterator iterator) {
+               this.iterator = iterator;
+       }
+
+       @Override
+       public Iterator<RyaStatement> iterator() {
+               // TODO Auto-generated method stub
+               return iterator;
+       }
+
+       @Override
+       public void closeQuietly() {
+               //TODO  don't know what to do here
+       }
+
+       @Override
+       public void close() throws IOException {
+               // TODO Auto-generated method stub
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterator.java
----------------------------------------------------------------------
diff --git 
a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterator.java
 
b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterator.java
new file mode 100644
index 0000000..8b2ae3b
--- /dev/null
+++ 
b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterator.java
@@ -0,0 +1,84 @@
+package mvm.rya.mongodb.iter;
+
+import info.aduna.iteration.CloseableIteration;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import mvm.rya.api.RdfCloudTripleStoreUtils;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.mongodb.dao.MongoDBStorageStrategy;
+
+import org.calrissian.mango.collect.CloseableIterable;
+import org.openrdf.query.BindingSet;
+
+import com.mongodb.DBCollection;
+import com.mongodb.DBCursor;
+import com.mongodb.DBObject;
+
+public class RyaStatementCursorIterator implements 
CloseableIteration<RyaStatement, RyaDAOException> {
+
+       private DBCollection coll;
+       private Iterator<DBObject> queryIterator;
+       private DBCursor currentCursor;
+       private MongoDBStorageStrategy strategy;
+       private Long maxResults;
+
+       public RyaStatementCursorIterator(DBCollection coll, Set<DBObject> 
queries, MongoDBStorageStrategy strategy) {
+               this.coll = coll;
+               this.queryIterator = queries.iterator();
+               this.strategy = strategy;
+       }
+
+       @Override
+       public boolean hasNext() {
+               if (!currentCursorIsValid()) {
+                       findNextValidCursor();
+               }
+               return currentCursorIsValid();
+       }
+
+       @Override
+       public RyaStatement next() {
+               if (!currentCursorIsValid()) {
+                       findNextValidCursor();
+               }
+               if (currentCursorIsValid()) {
+                       // convert to Rya Statement
+                       DBObject queryResult = currentCursor.next();
+                       RyaStatement statement = 
strategy.deserializeDBObject(queryResult);
+                       return statement;
+               }
+               return null;
+       }
+       
+       private void findNextValidCursor() {
+               while (queryIterator.hasNext()){
+                       DBObject currentQuery = queryIterator.next();
+                       currentCursor = coll.find(currentQuery);
+                       if (currentCursor.hasNext()) break;
+               }
+       }
+       
+       private boolean currentCursorIsValid() {
+               return (currentCursor != null) && currentCursor.hasNext();
+       }
+
+
+       public void setMaxResults(Long maxResults) {
+               this.maxResults = maxResults;
+       }
+
+       @Override
+       public void close() throws RyaDAOException {
+               // TODO don't know what to do here
+       }
+
+       @Override
+       public void remove() throws RyaDAOException {
+               next();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/pom.xml
----------------------------------------------------------------------
diff --git a/dao/pom.xml b/dao/pom.xml
new file mode 100644
index 0000000..8794f4a
--- /dev/null
+++ b/dao/pom.xml
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="utf-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>mvm.rya</groupId>
+        <artifactId>parent</artifactId>
+        <version>3.2.9</version>
+    </parent>
+    <artifactId>rya.dao</artifactId>
+    <packaging>pom</packaging>
+    <name>${project.groupId}.${project.artifactId}</name>
+    <modules>
+        <module>accumulo.rya</module>
+               <module>mongodb.rya</module>
+    </modules>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/cloudbase.rya.giraph/pom.xml
----------------------------------------------------------------------
diff --git a/extras/cloudbase.rya.giraph/pom.xml 
b/extras/cloudbase.rya.giraph/pom.xml
new file mode 100644
index 0000000..c894ea2
--- /dev/null
+++ b/extras/cloudbase.rya.giraph/pom.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <groupId>mvm.rya</groupId>
+        <artifactId>rya.extras</artifactId>
+        <version>3.0.4-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>cloudbase.rya.giraph</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.giraph</groupId>
+            <artifactId>giraph</artifactId>
+            <version>0.2-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>cloudbase</groupId>
+            <artifactId>cloudbase-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>mvm.rya</groupId>
+            <artifactId>cloudbase.rya</artifactId>
+        </dependency>
+    </dependencies>
+    <profiles>
+        <profile>
+            <id>mr</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <!-- NOTE: We don't need a groupId specification 
because the group is
+                  org.apache.maven.plugins ...which is assumed by default. -->
+                        <artifactId>maven-assembly-plugin</artifactId>
+                        <dependencies>
+                            <dependency>
+                                <groupId>mvm.cloud</groupId>
+                                <artifactId>hadoop-job-assembly</artifactId>
+                                <version>1.0.0-SNAPSHOT</version>
+                            </dependency>
+                        </dependencies>
+                        <executions>
+                            <execution>
+                                <id>make-assembly</id>
+                                <phase>package</phase>
+                                <goals>
+                                    <goal>single</goal>
+                                </goals>
+                                <configuration>
+                                    <attach>false</attach>
+                                    <descriptors>
+                                        
<descriptor>assemblies/job.xml</descriptor>
+                                    </descriptors>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseRyaVertexInputFormat.java
----------------------------------------------------------------------
diff --git 
a/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseRyaVertexInputFormat.java
 
b/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseRyaVertexInputFormat.java
new file mode 100644
index 0000000..490b64d
--- /dev/null
+++ 
b/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseRyaVertexInputFormat.java
@@ -0,0 +1,88 @@
+package mvm.rya.cloudbase.giraph.format;
+
+import cloudbase.core.data.Key;
+import cloudbase.core.data.Value;
+import com.google.common.collect.Maps;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.resolver.RyaContext;
+import mvm.rya.api.resolver.triple.TripleRow;
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Date: 7/27/12
+ * Time: 1:39 PM
+ */
+public class CloudbaseRyaVertexInputFormat
+        extends CloudbaseVertexInputFormat<Text, Text, Text, Text> {
+
+    private Configuration conf;
+
+    public VertexReader<Text, Text, Text, Text>
+    createVertexReader(InputSplit split, TaskAttemptContext context)
+            throws IOException {
+        try {
+
+            return new CloudbaseEdgeVertexReader(
+                    cloudbaseInputFormat.createRecordReader(split, context)) {
+            };
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+
+    }
+
+    /*
+       Reader takes Key/Value pairs from the underlying input format.
+    */
+    public static class CloudbaseEdgeVertexReader
+            extends CloudbaseVertexReader<Text, Text, Text, Text> {
+
+        private RyaContext ryaContext = RyaContext.getInstance();
+
+        public CloudbaseEdgeVertexReader(RecordReader<Key, Value> 
recordReader) {
+            super(recordReader);
+        }
+
+
+        public boolean nextVertex() throws IOException, InterruptedException {
+            return getRecordReader().nextKeyValue();
+        }
+
+        /*
+       Each Key/Value contains the information needed to construct the 
vertices.
+         */
+        public Vertex<Text, Text, Text, Text> getCurrentVertex()
+                throws IOException, InterruptedException {
+            try {
+                Key key = getRecordReader().getCurrentKey();
+                Value value = getRecordReader().getCurrentValue();
+                RyaStatement ryaStatement = 
ryaContext.deserializeTriple(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO,
+                        new TripleRow(key.getRow().getBytes(), 
key.getColumnFamily().getBytes(),
+                                key.getColumnQualifier().getBytes()));//TODO: 
assume spo for now
+                Vertex<Text, Text, Text, Text> vertex =
+                        BspUtils.<Text, Text, Text, Text>createVertex(
+                                getContext().getConfiguration());
+                Text vertexId = new Text(ryaStatement.getSubject().getData()); 
//TODO: set Text?
+                Map<Text, Text> edges = Maps.newHashMap();
+                Text edgeId = new Text(ryaStatement.getPredicate().getData());
+                edges.put(edgeId, new 
Text(ryaStatement.getObject().getData()));
+                vertex.initialize(vertexId, new Text(), edges, null);
+
+                return vertex;
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseRyaVertexOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseRyaVertexOutputFormat.java
 
b/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseRyaVertexOutputFormat.java
new file mode 100644
index 0000000..acdbe51
--- /dev/null
+++ 
b/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseRyaVertexOutputFormat.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package mvm.rya.cloudbase.giraph.format;
+
+import cloudbase.core.data.Mutation;
+import cloudbase.core.data.Value;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.domain.RyaType;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.cloudbase.RyaTableMutationsFactory;
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexWriter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+/*
+ Example subclass for writing vertices back to Cloudbase.
+ */
+public class CloudbaseRyaVertexOutputFormat
+        extends CloudbaseVertexOutputFormat<Text, Text, Text> {
+
+    public VertexWriter<Text, Text, Text>
+    createVertexWriter(TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        RecordWriter<Text, Mutation> writer =
+                cloudbaseOutputFormat.getRecordWriter(context);
+        String tableName = context.getConfiguration().get(OUTPUT_TABLE);
+        if (tableName == null)
+            throw new IOException("Forgot to set table name " +
+                    "using CloudbaseVertexOutputFormat.OUTPUT_TABLE");
+        return new CloudbaseEdgeVertexWriter(writer, tableName);
+    }
+
+    /*
+    Wraps RecordWriter for writing Mutations back to the configured Cloudbase 
Table.
+     */
+    public static class CloudbaseEdgeVertexWriter
+            extends CloudbaseVertexWriter<Text, Text, Text> {
+
+        public static final RyaTableMutationsFactory 
RYA_TABLE_MUTATIONS_FACTORY = new RyaTableMutationsFactory();
+        private final Text CF = new Text("cf");
+        private final Text PARENT = new Text("parent");
+        private Text tableName;
+
+        public CloudbaseEdgeVertexWriter(
+                RecordWriter<Text, Mutation> writer, String tableName) {
+            super(writer);
+            this.tableName = new Text(tableName);
+        }
+
+        /*
+        Write back a mutation that adds a qualifier for 'parent' containing 
the vertex value
+        as the cell value. Assume the vertex ID corresponds to a key.
+        */
+        public void writeVertex(Vertex<Text, Text, Text, ?> vertex)
+                throws IOException, InterruptedException {
+            RecordWriter<Text, Mutation> writer = getRecordWriter();
+            Text subj = vertex.getId();
+            Iterable<Edge<Text, Text>> edges = vertex.getEdges();
+            for (Edge<Text, Text> edge : edges) {
+                Text pred = edge.getTargetVertexId();
+                Text obj = edge.getValue();
+                Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, 
Collection<Mutation>> serialize =
+                        RYA_TABLE_MUTATIONS_FACTORY.serialize(new 
RyaURI(subj.toString()),
+                                new RyaURI(pred.toString()), new 
RyaType(obj.toString()), null);
+                Collection<Mutation> mutations = 
serialize.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO);
+                for (Mutation mut : mutations) {
+                    writer.write(tableName, mut); //TODO: Assuming SPO
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseVertexInputFormat.java
----------------------------------------------------------------------
diff --git 
a/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseVertexInputFormat.java
 
b/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseVertexInputFormat.java
new file mode 100644
index 0000000..fcc0c5e
--- /dev/null
+++ 
b/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseVertexInputFormat.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package mvm.rya.cloudbase.giraph.format;
+
+import cloudbase.core.client.mapreduce.CloudbaseInputFormat;
+import cloudbase.core.data.Key;
+import cloudbase.core.data.Value;
+import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.giraph.graph.VertexReader;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Class which wraps the CloudbaseInputFormat. It's designed
+ * as an extension point to VertexInputFormat subclasses who wish
+ * to read from Cloudbase Tables.
+ *
+ * @param <I> vertex id type
+ * @param <V> vertex value type
+ * @param <E> edge type
+ * @param <M> message type
+ */
+public abstract class CloudbaseVertexInputFormat<
+        I extends WritableComparable,
+        V extends Writable,
+        E extends Writable,
+        M extends Writable>
+        extends VertexInputFormat<I, V, E, M> implements Configurable {
+    /**
+     * delegate input format for all cloudbase operations.
+     */
+    protected CloudbaseInputFormat cloudbaseInputFormat =
+            new CloudbaseInputFormat();
+
+    /**
+     * Configured and injected by the job
+     */
+    private Configuration conf;
+
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+    }
+
+    /**
+     * Abstract class which provides a template for instantiating vertices
+     * from Cloudbase Key/Value pairs.
+     *
+     * @param <I> vertex id type
+     * @param <V> vertex value type
+     * @param <E> edge type
+     * @param <M> message type
+     */
+    public abstract static class CloudbaseVertexReader<
+            I extends WritableComparable,
+            V extends Writable, E extends Writable, M extends Writable>
+            implements VertexReader<I, V, E, M> {
+
+        /**
+         * Used by subclasses to read key/value pairs.
+         */
+        private final RecordReader<Key, Value> reader;
+        /**
+         * Context passed to initialize
+         */
+        private TaskAttemptContext context;
+
+        /**
+         * Constructor used to pass Record Reader instance
+         *
+         * @param reader Cloudbase record reader
+         */
+        public CloudbaseVertexReader(RecordReader<Key, Value> reader) {
+            this.reader = reader;
+        }
+
+        @Override
+        public void initialize(InputSplit inputSplit,
+                               TaskAttemptContext context)
+                throws IOException, InterruptedException {
+            reader.initialize(inputSplit, context);
+            this.context = context;
+        }
+
+        /**
+         * close
+         *
+         * @throws IOException
+         */
+        public void close() throws IOException {
+            reader.close();
+        }
+
+        /**
+         * getProgress
+         *
+         * @return progress
+         * @throws IOException
+         * @throws InterruptedException
+         */
+        public float getProgress() throws IOException, InterruptedException {
+            return reader.getProgress();
+        }
+
+        /**
+         * Get the result record reader
+         *
+         * @return Record reader to be used for reading.
+         */
+        protected RecordReader<Key, Value> getRecordReader() {
+            return reader;
+        }
+
+        /**
+         * getContext
+         *
+         * @return Context passed to initialize.
+         */
+        protected TaskAttemptContext getContext() {
+            return context;
+        }
+
+    }
+
+    /**
+     * getSplits
+     *
+     * @param context    Context of the job
+     * @param numWorkers Number of workers used for this job
+     * @return tablet splits
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public List<InputSplit> getSplits(
+            JobContext context, int numWorkers)
+            throws IOException, InterruptedException {
+        List<InputSplit> splits = null;
+        try {
+            splits = cloudbaseInputFormat.getSplits(context);
+        } catch (IOException e) {
+            if (e.getMessage().contains("Input info has not been set")) {
+                throw new IOException(e.getMessage() +
+                        " Make sure you initialized" +
+                        " CloudbaseInputFormat static setters " +
+                        "before passing the config to GiraphJob.");
+            }
+        }
+        return splits;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseVertexOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseVertexOutputFormat.java
 
b/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseVertexOutputFormat.java
new file mode 100644
index 0000000..f88dfe6
--- /dev/null
+++ 
b/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseVertexOutputFormat.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package mvm.rya.cloudbase.giraph.format;
+
+import cloudbase.core.client.mapreduce.CloudbaseOutputFormat;
+import cloudbase.core.data.Mutation;
+import org.apache.giraph.graph.VertexOutputFormat;
+import org.apache.giraph.graph.VertexWriter;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+/**
+ *
+ *  Class which wraps the CloudbaseOutputFormat. It's designed
+ *  as an extension point to VertexOutputFormat subclasses who wish
+ *  to write vertices back to an Cloudbase table.
+ *
+ *
+ * @param <I> vertex id type
+ * @param <V>  vertex value type
+ * @param <E>  edge type
+ */
+public abstract class CloudbaseVertexOutputFormat<
+        I extends WritableComparable,
+        V extends Writable,
+        E extends Writable>
+        extends VertexOutputFormat<I, V, E> implements Configurable {
+
+
+  /**
+   * Output table parameter
+   */
+  protected static final String OUTPUT_TABLE = "OUTPUT_TABLE";
+
+  /**
+   * Cloudbase delegate for table output
+   */
+  protected CloudbaseOutputFormat cloudbaseOutputFormat =
+          new CloudbaseOutputFormat();
+
+
+  /**
+   * Used by configured interface
+   */
+  private Configuration conf;
+
+  /**
+   *
+   * Main abstraction point for vertex writers to persist back
+   * to Cloudbase tables.
+   *
+   * @param <I> vertex id type
+   * @param <V> vertex value type
+   * @param <E>  edge type
+   */
+  public abstract static class CloudbaseVertexWriter<
+          I extends WritableComparable,
+          V extends Writable,
+          E extends Writable>
+          implements VertexWriter<I, V, E> {
+
+    /**
+     * task attempt context.
+     */
+    private TaskAttemptContext context;
+
+    /**
+     * Cloudbase record writer
+     */
+    private RecordWriter<Text, Mutation> recordWriter;
+
+    /**
+     * Constructor for use with subclasses
+     *
+     * @param recordWriter cloudbase record writer
+     */
+    public CloudbaseVertexWriter(RecordWriter<Text, Mutation> recordWriter) {
+      this.recordWriter = recordWriter;
+    }
+
+    /**
+     * initialize
+     *
+     * @param context Context used to write the vertices.
+     * @throws IOException
+     */
+    public void initialize(TaskAttemptContext context) throws IOException {
+      this.context = context;
+    }
+
+    /**
+     *  close
+     *
+     * @param context the context of the task
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public void close(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      recordWriter.close(context);
+    }
+
+    /**
+     * Get the table record writer;
+     *
+     * @return Record writer to be used for writing.
+     */
+    public RecordWriter<Text, Mutation> getRecordWriter() {
+      return recordWriter;
+    }
+
+    /**
+     * Get the context.
+     *
+     * @return Context passed to initialize.
+     */
+    public TaskAttemptContext getContext() {
+      return context;
+    }
+
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  /**
+   *
+   * checkOutputSpecs
+   *
+   * @param context information about the job
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void checkOutputSpecs(JobContext context)
+    throws IOException, InterruptedException {
+    try {
+      cloudbaseOutputFormat.checkOutputSpecs(context);
+    } catch (IOException e) {
+      if (e.getMessage().contains("Output info has not been set")) {
+        throw new IOException(e.getMessage() + " Make sure you initialized" +
+                " CloudbaseOutputFormat static setters " +
+                "before passing the config to GiraphJob.");
+      }
+    }
+  }
+
+  /**
+   * getOutputCommitter
+   *
+   * @param context the task context
+   * @return OutputCommitter
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    return cloudbaseOutputFormat.getOutputCommitter(context);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/PrintVertexOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/PrintVertexOutputFormat.java
 
b/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/PrintVertexOutputFormat.java
new file mode 100644
index 0000000..e90ca66
--- /dev/null
+++ 
b/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/PrintVertexOutputFormat.java
@@ -0,0 +1,94 @@
+package mvm.rya.cloudbase.giraph.format;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexOutputFormat;
+import org.apache.giraph.graph.VertexWriter;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Date: 7/27/12
+ * Time: 2:58 PM
+ */
+public class PrintVertexOutputFormat<
+        I extends WritableComparable,
+        V extends Writable,
+        E extends Writable>
+        extends VertexOutputFormat<I, V, E> implements Configurable {
+    @Override
+    public void setConf(Configuration entries) {
+        //To change body of implemented methods use File | Settings | File 
Templates.
+    }
+
+    @Override
+    public Configuration getConf() {
+        return null;  //To change body of implemented methods use File | 
Settings | File Templates.
+    }
+
+    @Override
+    public VertexWriter<I, V, E> createVertexWriter(TaskAttemptContext 
context) throws IOException, InterruptedException {
+        return new VertexWriter<I, V, E>() {
+            @Override
+            public void initialize(TaskAttemptContext context) throws 
IOException {
+                //To change body of implemented methods use File | Settings | 
File Templates.
+            }
+
+            @Override
+            public void writeVertex(Vertex<I, V, E, ?> iveVertex) throws 
IOException, InterruptedException {
+                System.out.println(iveVertex);
+            }
+
+            @Override
+            public void close(TaskAttemptContext context) throws IOException, 
InterruptedException {
+                //To change body of implemented methods use File | Settings | 
File Templates.
+            }
+        };
+    }
+
+    @Override
+    public void checkOutputSpecs(JobContext context) throws IOException, 
InterruptedException {
+        //To change body of implemented methods use File | Settings | File 
Templates.
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
throws IOException, InterruptedException {
+        return new OutputCommitter() {
+            @Override
+            public void setupJob(JobContext jobContext) throws IOException {
+                //To change body of implemented methods use File | Settings | 
File Templates.
+            }
+
+            @Override
+            public void cleanupJob(JobContext jobContext) throws IOException {
+                //To change body of implemented methods use File | Settings | 
File Templates.
+            }
+
+            @Override
+            public void setupTask(TaskAttemptContext taskAttemptContext) 
throws IOException {
+                //To change body of implemented methods use File | Settings | 
File Templates.
+            }
+
+            @Override
+            public boolean needsTaskCommit(TaskAttemptContext 
taskAttemptContext) throws IOException {
+                return false;  //To change body of implemented methods use 
File | Settings | File Templates.
+            }
+
+            @Override
+            public void commitTask(TaskAttemptContext taskAttemptContext) 
throws IOException {
+                //To change body of implemented methods use File | Settings | 
File Templates.
+            }
+
+            @Override
+            public void abortTask(TaskAttemptContext taskAttemptContext) 
throws IOException {
+                //To change body of implemented methods use File | Settings | 
File Templates.
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/cloudbase.rya.giraph/src/test/java/mvm/rya/cloudbase/giraph/format/BspCase.java
----------------------------------------------------------------------
diff --git 
a/extras/cloudbase.rya.giraph/src/test/java/mvm/rya/cloudbase/giraph/format/BspCase.java
 
b/extras/cloudbase.rya.giraph/src/test/java/mvm/rya/cloudbase/giraph/format/BspCase.java
new file mode 100644
index 0000000..fb20dd8
--- /dev/null
+++ 
b/extras/cloudbase.rya.giraph/src/test/java/mvm/rya/cloudbase/giraph/format/BspCase.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mvm.rya.cloudbase.giraph.format;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+import org.apache.giraph.examples.GeneratedVertexReader;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.zk.ZooKeeperExt;
+
+import junit.framework.TestCase;
+
+/**
+ * Duplicate copy from main giraph trunk. At least until there
+ * is a maven test artifact for Giraph.
+ *
+ * Extended TestCase for making setting up Bsp testing.
+ */
+public class BspCase extends TestCase implements Watcher {
+  /** JobTracker system property */
+  private final String jobTracker =
+      System.getProperty("prop.mapred.job.tracker");
+  /** Jar location system property */
+  private final String jarLocation =
+      System.getProperty("prop.jarLocation", "");
+  /** Number of actual processes for the BSP application */
+  private int numWorkers = 1;
+  /** ZooKeeper list system property */
+  private final String zkList = System.getProperty("prop.zookeeper.list");
+
+  /**
+   * Adjust the configuration to the basic test case
+   */
+  public final void setupConfiguration(GiraphJob job) {
+    Configuration conf = job.getConfiguration();
+    conf.set("mapred.jar", getJarLocation());
+
+    // Allow this test to be run on a real Hadoop setup
+    if (getJobTracker() != null) {
+      System.out.println("setup: Sending job to job tracker " +
+          getJobTracker() + " with jar path " + getJarLocation()
+          + " for " + getName());
+      conf.set("mapred.job.tracker", getJobTracker());
+      job.setWorkerConfiguration(getNumWorkers(),
+          getNumWorkers(),
+          100.0f);
+    }
+    else {
+      System.out.println("setup: Using local job runner with " +
+          "location " + getJarLocation() + " for "
+          + getName());
+      job.setWorkerConfiguration(1, 1, 100.0f);
+      // Single node testing
+      conf.setBoolean(GiraphJob.SPLIT_MASTER_WORKER, false);
+    }
+    conf.setInt(GiraphJob.POLL_ATTEMPTS, 10);
+    conf.setInt(GiraphJob.POLL_MSECS, 3*1000);
+    conf.setInt(GiraphJob.ZOOKEEPER_SERVERLIST_POLL_MSECS, 500);
+    if (getZooKeeperList() != null) {
+      job.setZooKeeperConfiguration(getZooKeeperList());
+    }
+    // GeneratedInputSplit will generate 5 vertices
+    conf.setLong(GeneratedVertexReader.READER_VERTICES, 5);
+  }
+
+  /**
+   * Create the test case
+   *
+   * @param testName name of the test case
+   */
+  public BspCase(String testName) {
+    super(testName);
+
+  }
+
+  /**
+   * Get the number of workers used in the BSP application
+   *
+   */
+  public int getNumWorkers() {
+    return numWorkers;
+  }
+
+  /**
+   * Get the ZooKeeper list
+   */
+  public String getZooKeeperList() {
+    return zkList;
+  }
+
+  /**
+   * Get the jar location
+   *
+   * @return location of the jar file
+   */
+  String getJarLocation() {
+    return jarLocation;
+  }
+
+  /**
+   * Get the job tracker location
+   *
+   * @return job tracker location as a string
+   */
+  String getJobTracker() {
+    return jobTracker;
+  }
+
+  /**
+   * Get the single part file status and make sure there is only one part
+   *
+   * @param job Job to get the file system from
+   * @param partDirPath Directory where the single part file should exist
+   * @return Single part file status
+   * @throws java.io.IOException
+   */
+  public static FileStatus getSinglePartFileStatus(GiraphJob job,
+      Path partDirPath) throws IOException {
+    FileSystem fs = FileSystem.get(job.getConfiguration());
+    FileStatus[] statusArray = fs.listStatus(partDirPath);
+    FileStatus singlePartFileStatus = null;
+    int partFiles = 0;
+    for (FileStatus fileStatus : statusArray) {
+      if (fileStatus.getPath().getName().equals("part-m-00000")) {
+        singlePartFileStatus = fileStatus;
+      }
+      if (fileStatus.getPath().getName().startsWith("part-m-")) {
+        ++partFiles;
+      }
+    }
+    if (partFiles != 1) {
+      throw new ArithmeticException(
+          "getSinglePartFile: Part file count should be 1, but is " +
+              partFiles);
+    }
+    return singlePartFileStatus;
+  }
+
+  @Override
+  public void setUp() {
+    if (jobTracker != null) {
+      System.out.println("Setting tasks to 3 for " + getName() +
+          " since JobTracker exists...");
+      numWorkers = 3;
+    }
+    try {
+      Configuration conf = new Configuration();
+      FileSystem hdfs = FileSystem.get(conf);
+      // Since local jobs always use the same paths, remove them
+      Path oldLocalJobPaths = new Path(
+          GiraphJob.ZOOKEEPER_MANAGER_DIR_DEFAULT);
+      FileStatus[] fileStatusArr;
+      try {
+        fileStatusArr = hdfs.listStatus(oldLocalJobPaths);
+        for (FileStatus fileStatus : fileStatusArr) {
+          if (fileStatus.isDir() &&
+              fileStatus.getPath().getName().contains("job_local")) {
+            System.out.println("Cleaning up local job path " +
+                fileStatus.getPath().getName());
+            hdfs.delete(oldLocalJobPaths, true);
+          }
+        }
+      } catch (FileNotFoundException e) {
+        // ignore this FileNotFound exception and continue.
+      }
+      if (zkList == null) {
+        return;
+      }
+      ZooKeeperExt zooKeeperExt =
+          new ZooKeeperExt(zkList, 30*1000, this);
+      List<String> rootChildren = zooKeeperExt.getChildren("/", false);
+      for (String rootChild : rootChildren) {
+        if (rootChild.startsWith("_hadoopBsp")) {
+          List<String> children =
+              zooKeeperExt.getChildren("/" + rootChild, false);
+          for (String child: children) {
+            if (child.contains("job_local_")) {
+              System.out.println("Cleaning up /_hadoopBsp/" +
+                  child);
+              zooKeeperExt.deleteExt(
+                  "/_hadoopBsp/" + child, -1, true);
+            }
+          }
+        }
+      }
+      zooKeeperExt.close();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void process(WatchedEvent event) {
+    // Do nothing
+  }
+
+  /**
+   * Helper method to remove an old output directory if it exists,
+   * and set the output path for any VertexOutputFormat that uses
+   * FileOutputFormat.
+   *
+   * @param job Job to set the output path for
+   * @throws java.io.IOException
+   */
+  public static void removeAndSetOutput(GiraphJob job,
+      Path outputPath) throws IOException {
+    remove(job.getConfiguration(), outputPath);
+    FileOutputFormat.setOutputPath(job.getInternalJob(), outputPath);
+  }
+
+  /**
+   * Helper method to remove a path if it exists.
+   *
+   * @param conf Configuration to load FileSystem from
+   * @param path Path to remove
+   * @throws java.io.IOException
+   */
+  public static void remove(Configuration conf, Path path)
+      throws IOException {
+    FileSystem hdfs = FileSystem.get(conf);
+    hdfs.delete(path, true);
+  }
+
+  public static String getCallingMethodName() {
+    return Thread.currentThread().getStackTrace()[2].getMethodName();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/cloudbase.rya.giraph/src/test/java/mvm/rya/cloudbase/giraph/format/TestCloudbaseVertexFormat.java
----------------------------------------------------------------------
diff --git 
a/extras/cloudbase.rya.giraph/src/test/java/mvm/rya/cloudbase/giraph/format/TestCloudbaseVertexFormat.java
 
b/extras/cloudbase.rya.giraph/src/test/java/mvm/rya/cloudbase/giraph/format/TestCloudbaseVertexFormat.java
new file mode 100644
index 0000000..e420ff6
--- /dev/null
+++ 
b/extras/cloudbase.rya.giraph/src/test/java/mvm/rya/cloudbase/giraph/format/TestCloudbaseVertexFormat.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mvm.rya.cloudbase.giraph.format;
+
+import cloudbase.core.client.BatchWriter;
+import cloudbase.core.client.Connector;
+import cloudbase.core.client.ZooKeeperInstance;
+import cloudbase.core.client.mapreduce.CloudbaseInputFormat;
+import cloudbase.core.client.mock.MockInstance;
+import cloudbase.core.data.Range;
+import cloudbase.core.security.Authorizations;
+import junit.framework.Test;
+import junit.framework.TestSuite;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.cloudbase.CloudbaseRdfConfiguration;
+import mvm.rya.cloudbase.CloudbaseRyaDAO;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.lib.TextVertexOutputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+
+/*
+    Test class for Cloudbase vertex input/output formats.
+ */
+public class TestCloudbaseVertexFormat extends BspCase {
+
+    private final String TABLE_NAME = "rya_spo";
+    private final String INSTANCE_NAME = "stratus";
+    private final Text FAMILY = new Text("cf");
+    private final Text CHILDREN = new Text("children");
+    private final String USER = "root";
+    private final byte[] PASSWORD = new byte[]{};
+    private final Text OUTPUT_FIELD = new Text("parent");
+
+
+    private final Logger log = 
Logger.getLogger(TestCloudbaseVertexFormat.class);
+
+    /**
+     * Create the test case
+     *
+     * @param testName name of the test case
+     */
+    public TestCloudbaseVertexFormat(String testName) {
+        super(testName);
+    }
+
+    /**
+     * @return the suite of tests being tested
+     */
+    public static Test suite() {
+        return new TestSuite(TestCloudbaseVertexFormat.class);
+
+    }
+
+    /*
+    Write a simple parent-child directed graph to Cloudbase.
+    Run a job which reads the values
+    into subclasses that extend CloudbaseVertex I/O formats.
+    Check the output after the job.
+    */
+    public void testCloudbaseInputOutput() throws Exception {
+//        if (System.getProperty("prop.mapred.job.tracker") != null) {
+//            if(log.isInfoEnabled())
+//                log.info("testCloudbaseInputOutput: " +
+//                        "Ignore this test if not local mode.");
+//            return;
+//        }
+//
+//        System.setProperty("prop.jarLocation", 
"/temp/cloudbase.rya.giraph-3.0.0-SNAPSHOT.jar");
+        File jarTest = new File(System.getProperty("prop.jarLocation"));
+        if (!jarTest.exists()) {
+            fail("Could not find Giraph jar at " +
+                    "location specified by 'prop.jarLocation'. " +
+                    "Make sure you built the main Giraph artifact?.");
+        }
+
+        //Write out vertices and edges out to a mock instance.
+//        MockInstance mockInstance = new MockInstance(INSTANCE_NAME);
+        Connector c = new ZooKeeperInstance("stratus", 
"stratus13:2181").getConnector("root", "password".getBytes());
+        CloudbaseRyaDAO ryaDAO = new CloudbaseRyaDAO();
+        ryaDAO.setConnector(c);
+        CloudbaseRdfConfiguration cloudbaseRdfConfiguration = new 
CloudbaseRdfConfiguration();
+//        cloudbaseRdfConfiguration.setTablePrefix("test_");
+        ryaDAO.init();
+//        c.tableOperations().create(TABLE_NAME);
+//        BatchWriter bw = c.createBatchWriter(TABLE_NAME, 10000L, 1000L, 4);
+
+        ryaDAO.add(new RyaStatement(new RyaURI("urn:test#1234"),
+                new RyaURI("urn:test#pred1"),
+                new RyaURI("urn:test#obj1")));
+        ryaDAO.add(new RyaStatement(new RyaURI("urn:test#1234"),
+                new RyaURI("urn:test#pred2"),
+                new RyaURI("urn:test#obj2")));
+        ryaDAO.add(new RyaStatement(new RyaURI("urn:test#1234"),
+                new RyaURI("urn:test#pred3"),
+                new RyaURI("urn:test#obj3")));
+        ryaDAO.add(new RyaStatement(new RyaURI("urn:test#1234"),
+                new RyaURI("urn:test#pred4"),
+                new RyaURI("urn:test#obj4")));
+        ryaDAO.commit();
+
+//        Mutation m1 = new Mutation(new Text("0001"));
+//        m1.put(FAMILY, CHILDREN, new Value("0002".getBytes()));
+//        bw.addMutation(m1);
+//
+//        Mutation m2 = new Mutation(new Text("0002"));
+//        m2.put(FAMILY, CHILDREN, new Value("0003".getBytes()));
+//        bw.addMutation(m2);
+//        if(log.isInfoEnabled())
+//            log.info("Writing mutations to Cloudbase table");
+//        bw.close();
+
+        Configuration conf = new Configuration();
+//        conf.set(CloudbaseVertexOutputFormat.OUTPUT_TABLE, TABLE_NAME);
+
+        /*
+        Very important to initialize the formats before
+        sending configuration to the GiraphJob. Otherwise
+        the internally constructed Job in GiraphJob will
+        not have the proper context initialization.
+         */
+        GiraphJob job = new GiraphJob(conf, getCallingMethodName());
+        CloudbaseInputFormat.setInputInfo(job.getInternalJob(), USER, 
"password".getBytes(),
+                TABLE_NAME, new Authorizations());
+//        CloudbaseInputFormat.setMockInstance(job.getInternalJob(), 
INSTANCE_NAME);
+        CloudbaseInputFormat.setZooKeeperInstance(job.getInternalJob(), 
"stratus", "stratus13:2181");
+        CloudbaseInputFormat.setRanges(job.getInternalJob(), 
Collections.singleton(new Range()));
+
+//        CloudbaseOutputFormat.setOutputInfo(job.getInternalJob(), USER, 
PASSWORD, true, null);
+//        CloudbaseOutputFormat.setMockInstance(job.getInternalJob(), 
INSTANCE_NAME);
+
+        setupConfiguration(job);
+        job.setVertexClass(EdgeNotification.class);
+        job.setVertexInputFormatClass(CloudbaseRyaVertexInputFormat.class);
+        job.setVertexOutputFormatClass(PrintVertexOutputFormat.class);
+        FileOutputFormat.setOutputPath(job.getInternalJob(), new 
Path("/temp/graphout"));
+
+//        HashSet<Pair<Text, Text>> columnsToFetch = new 
HashSet<Pair<Text,Text>>();
+//        columnsToFetch.add(new Pair<Text, Text>(FAMILY, CHILDREN));
+//        CloudbaseInputFormat.fetchColumns(job.getInternalJob(), 
columnsToFetch);
+
+        if (log.isInfoEnabled())
+            log.info("Running edge notification job using Cloudbase input");
+        assertTrue(job.run(true));
+
+//        Scanner scanner = c.createScanner(TABLE_NAME, new Authorizations());
+//        scanner.setRange(new Range("0002", "0002"));
+//        scanner.fetchColumn(FAMILY, OUTPUT_FIELD);
+//        boolean foundColumn = false;
+//
+//        if(log.isInfoEnabled())
+//            log.info("Verify job output persisted correctly.");
+//        //make sure we found the qualifier.
+//        assertTrue(scanner.iterator().hasNext());
+//
+//
+//        //now we check to make sure the expected value from the job 
persisted correctly.
+//        for(Map.Entry<Key,Value> entry : scanner) {
+//            Text row = entry.getKey().getRow();
+//            assertEquals("0002", row.toString());
+//            Value value = entry.getValue();
+//            assertEquals("0001", ByteBufferUtil.toString(
+//                    ByteBuffer.wrap(value.get())));
+//            foundColumn = true;
+//        }
+    }
+
+    /*
+   Test compute method that sends each edge a notification of its parents.
+   The test set only has a 1-1 parent-to-child ratio for this unit test.
+    */
+    public static class EdgeNotification
+            extends EdgeListVertex<Text, Text, Text, Text> {
+        @Override
+        public void compute(Iterable<Text> messages) throws IOException {
+            System.out.println("Edges: " + messages);
+            for (Text message : messages) {
+                getValue().set(message);
+            }
+            if (getSuperstep() == 0) {
+//                sendMessageToAllEdges(getId());
+            }
+            voteToHalt();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/generic.mr/generic.mr.accumulo/pom.xml
----------------------------------------------------------------------
diff --git a/extras/generic.mr/generic.mr.accumulo/pom.xml 
b/extras/generic.mr/generic.mr.accumulo/pom.xml
new file mode 100644
index 0000000..bc03fef
--- /dev/null
+++ b/extras/generic.mr/generic.mr.accumulo/pom.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <groupId>mvm.rya</groupId>
+        <artifactId>generic.mr</artifactId>
+        <version>3.2.5-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>generic.mr.accumulo</artifactId>
+    <name>${project.groupId}.${project.artifactId}</name>
+    <dependencies>
+        <dependency>
+            <groupId>mvm.rya</groupId>
+            <artifactId>generic.mr.api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.gmaven</groupId>
+                <artifactId>gmaven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+
+    <profiles>
+        <profile>
+            <id>accumulo</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.accumulo</groupId>
+                    <artifactId>accumulo-core</artifactId>
+                    <optional>true</optional>
+                </dependency>
+            </dependencies>
+        </profile>
+        <profile>
+            <id>cloudbase</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>com.texeltek</groupId>
+                    <artifactId>accumulo-cloudbase-shim</artifactId>
+                    <optional>true</optional>
+                </dependency>
+            </dependencies>
+        </profile>
+    </profiles>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/generic.mr/generic.mr.accumulo/src/main/groovy/mvm/rya/generic/mr/accumulo/AccumuloMRInfo.groovy
----------------------------------------------------------------------
diff --git 
a/extras/generic.mr/generic.mr.accumulo/src/main/groovy/mvm/rya/generic/mr/accumulo/AccumuloMRInfo.groovy
 
b/extras/generic.mr/generic.mr.accumulo/src/main/groovy/mvm/rya/generic/mr/accumulo/AccumuloMRInfo.groovy
new file mode 100644
index 0000000..89d4633
--- /dev/null
+++ 
b/extras/generic.mr/generic.mr.accumulo/src/main/groovy/mvm/rya/generic/mr/accumulo/AccumuloMRInfo.groovy
@@ -0,0 +1,146 @@
+package mvm.rya.generic.mr.accumulo
+
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat
+import org.apache.accumulo.core.data.Key
+import org.apache.accumulo.core.data.Mutation
+import org.apache.accumulo.core.data.Value
+import org.apache.accumulo.core.security.Authorizations
+import org.apache.accumulo.core.security.ColumnVisibility
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.mapreduce.Job
+import mvm.rya.generic.mr.api.MRInfo
+import org.apache.accumulo.core.client.mock.MockInstance
+import org.apache.accumulo.core.client.ZooKeeperInstance
+
+/**
+ * Date: 12/3/12
+ * Time: 9:00 AM
+ */
+class AccumuloMRInfo implements MRInfo {
+
+    def Configuration conf
+    def connector;
+
+    @Override
+    void initMRJob(Job job, String table, String outtable, String[] auths) {
+        Configuration conf = job.configuration
+        String username = conf.get(USERNAME)
+        String password = conf.get(PASSWORD)
+        String instance = conf.get(INSTANCE)
+        String zookeepers = conf.get(ZOOKEEPERS)
+        String mock = conf.get(MOCK)
+
+        //input
+        if (Boolean.parseBoolean(mock)) {
+            AccumuloInputFormat.setMockInstance(conf, instance)
+            AccumuloOutputFormat.setMockInstance(conf, instance)
+        } else if (zookeepers != null) {
+            AccumuloInputFormat.setZooKeeperInstance(conf, instance, 
zookeepers)
+            AccumuloOutputFormat.setZooKeeperInstance(conf, instance, 
zookeepers)
+        } else {
+            throw new IllegalArgumentException("Must specify either mock or 
zookeepers");
+        }
+
+        AccumuloInputFormat.setInputInfo(conf, username, password.getBytes(), 
table, new Authorizations(auths))
+        job.setInputFormatClass(AccumuloInputFormat.class);
+
+        // OUTPUT
+        job.setOutputFormatClass(AccumuloOutputFormat.class);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(Mutation.class);
+        AccumuloOutputFormat.setOutputInfo(job, username, password.getBytes(), 
true, outtable);
+    }
+
+    @Override
+    def key(byte[] data) {
+        Key key = new Key();
+        key.readFields(new DataInputStream(new ByteArrayInputStream(data)))
+        return key
+    }
+
+    @Override
+    def key(String row, String cf, String cq, String cv, long timestamp) {
+        return new Key(row, cf, cq, cv, timestamp)
+    }
+
+    @Override
+    def value(byte[] data) {
+        return new Value(data)
+    }
+
+    @Override
+    def columnVisibility(String cv) {
+        return new ColumnVisibility(cv)
+    }
+
+    @Override
+    def mutation(String row, String cf, String cq, String cv, long timestamp, 
byte[] val) {
+        Mutation mutation = new Mutation(row);
+        mutation.put(cf, cq, columnVisibility(cv), timestamp, value(val))
+        return mutation
+    }
+
+    @Override
+    def instance() {
+        assert conf != null
+
+        String instance_str = conf.get(INSTANCE)
+        String zookeepers = conf.get(ZOOKEEPERS)
+        String mock = conf.get(MOCK)
+        if (Boolean.parseBoolean(mock)) {
+            return new MockInstance(instance_str)
+        } else if (zookeepers != null) {
+            return new ZooKeeperInstance(instance_str, zookeepers)
+        } else {
+            throw new IllegalArgumentException("Must specify either mock or 
zookeepers");
+        }
+    }
+
+    @Override
+    def connector(def instance) {
+        if (connector != null) return connector
+
+        String username = conf.get(USERNAME)
+        String password = conf.get(PASSWORD)
+        if (instance == null)
+            instance = instance()
+        connector = instance.getConnector(username, password)
+        return connector
+    }
+
+    @Override
+    def void writeMutations(def connector, String tableName, Iterator 
mutations) {
+        def bw = connector.createBatchWriter(tableName, 10000l, 10000l, 4);
+        mutations.each { m ->
+            bw.addMutation(m)
+        }
+        bw.flush()
+        bw.close()
+    }
+
+    @Override
+    def scanner(def connector, String tableName, String[] auths) {
+        return connector.createScanner(tableName, new Authorizations(auths))
+    }
+
+    @Override
+    def batchScanner(def connector, String tableName, String[] auths, int 
numThreads) {
+        return connector.createBatchScanner(tableName, new 
Authorizations(auths), numThreads)
+    }
+
+    @Override
+    def range(def startKey, def endKey) {
+        assert startKey != null
+
+        if (endKey != null)
+            return new org.apache.accumulo.core.data.Range(startKey, endKey)
+        return new org.apache.accumulo.core.data.Range(startKey)
+    }
+
+    @Override
+    def authorizations(String[] auths) {
+        return new Authorizations(auths)
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/generic.mr/generic.mr.accumulo/src/main/resources/META-INF/services/mvm.rya.generic.mr.api.MRInfo
----------------------------------------------------------------------
diff --git 
a/extras/generic.mr/generic.mr.accumulo/src/main/resources/META-INF/services/mvm.rya.generic.mr.api.MRInfo
 
b/extras/generic.mr/generic.mr.accumulo/src/main/resources/META-INF/services/mvm.rya.generic.mr.api.MRInfo
new file mode 100644
index 0000000..81d47de
--- /dev/null
+++ 
b/extras/generic.mr/generic.mr.accumulo/src/main/resources/META-INF/services/mvm.rya.generic.mr.api.MRInfo
@@ -0,0 +1 @@
+mvm.rya.generic.mr.accumulo.AccumuloMRInfo
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/generic.mr/generic.mr.api/pom.xml
----------------------------------------------------------------------
diff --git a/extras/generic.mr/generic.mr.api/pom.xml 
b/extras/generic.mr/generic.mr.api/pom.xml
new file mode 100644
index 0000000..ec1e47b
--- /dev/null
+++ b/extras/generic.mr/generic.mr.api/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <groupId>mvm.rya</groupId>
+        <artifactId>generic.mr</artifactId>
+        <version>3.2.5-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>generic.mr.api</artifactId>
+    <name>${project.groupId}.${project.artifactId}</name>
+    <dependencies>
+        <dependency>
+            <groupId>org.codehaus.groovy</groupId>
+            <artifactId>groovy-all</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.gmaven</groupId>
+                <artifactId>gmaven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/generic.mr/generic.mr.api/src/main/groovy/mvm/rya/generic/mr/api/MRInfo.groovy
----------------------------------------------------------------------
diff --git 
a/extras/generic.mr/generic.mr.api/src/main/groovy/mvm/rya/generic/mr/api/MRInfo.groovy
 
b/extras/generic.mr/generic.mr.api/src/main/groovy/mvm/rya/generic/mr/api/MRInfo.groovy
new file mode 100644
index 0000000..bdcc61e
--- /dev/null
+++ 
b/extras/generic.mr/generic.mr.api/src/main/groovy/mvm/rya/generic/mr/api/MRInfo.groovy
@@ -0,0 +1,43 @@
+package mvm.rya.generic.mr.api
+
+import org.apache.hadoop.conf.Configurable
+import org.apache.hadoop.mapreduce.Job
+
+/**
+ * Date: 12/3/12
+ * Time: 8:56 AM
+ */
+public interface MRInfo extends Configurable{
+
+    public static final String USERNAME = "username"
+    public static final String PASSWORD = "password"
+    public static final String INSTANCE = "instance"
+    public static final String ZOOKEEPERS = "zookeepers"
+    public static final String MOCK = "mock"
+
+    def void initMRJob(Job job, String table, String outtable, String[] auths)
+
+    def key(byte[] data);
+
+    def key(String row, String cf, String cq, String cv, long timestamp);
+
+    def value(byte[] data);
+
+    def columnVisibility(String cv);
+
+    def mutation(String row, String cf, String cq, String cv, long timestamp, 
byte[] val);
+
+    def instance()
+
+    def connector(def instance)
+
+    def void writeMutations(def connector, String tableName, Iterator 
mutations)
+
+    def scanner(def connector, String tableName, String[] auths)
+
+    def batchScanner(def connector, String tableName, String[] auths, int 
numThreads)
+
+    def range(def startKey, def endKey)
+
+    def authorizations(String[] auths)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/generic.mr/generic.mr.api/src/main/groovy/mvm/rya/generic/mr/api/MRInfoContext.groovy
----------------------------------------------------------------------
diff --git 
a/extras/generic.mr/generic.mr.api/src/main/groovy/mvm/rya/generic/mr/api/MRInfoContext.groovy
 
b/extras/generic.mr/generic.mr.api/src/main/groovy/mvm/rya/generic/mr/api/MRInfoContext.groovy
new file mode 100644
index 0000000..a2b92ec
--- /dev/null
+++ 
b/extras/generic.mr/generic.mr.api/src/main/groovy/mvm/rya/generic/mr/api/MRInfoContext.groovy
@@ -0,0 +1,28 @@
+package mvm.rya.generic.mr.api
+
+import org.apache.hadoop.conf.Configuration
+
+/**
+ * Date: 12/5/12
+ * Time: 1:32 PM
+ */
+class MRInfoContext {
+
+    private static currentMrInfo;
+
+    public static MRInfo currentMRInfo() {
+        return currentMRInfo(null);
+    }
+
+    public static MRInfo currentMRInfo(Configuration config) {
+        if (currentMrInfo == null) {
+            def iter = ServiceLoader.load(MRInfo.class, 
Thread.currentThread().getContextClassLoader()).iterator()
+            if (iter.hasNext()) {
+                currentMrInfo = iter.next()
+                if (config != null) currentMrInfo.setConf(config)
+            }
+        }
+        return currentMrInfo
+    }
+
+}


Reply via email to