http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBRyaDAO.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBRyaDAO.java b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBRyaDAO.java new file mode 100644 index 0000000..d3f9ee5 --- /dev/null +++ b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBRyaDAO.java @@ -0,0 +1,202 @@ +package mvm.rya.mongodb; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.persist.RyaDAO; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.api.persist.RyaNamespaceManager; +import mvm.rya.api.persist.index.RyaSecondaryIndexer; +import mvm.rya.api.persist.query.RyaQueryEngine; +import mvm.rya.mongodb.dao.MongoDBNamespaceManager; +import mvm.rya.mongodb.dao.MongoDBStorageStrategy; +import mvm.rya.mongodb.dao.SimpleMongoDBNamespaceManager; +import mvm.rya.mongodb.dao.SimpleMongoDBStorageStrategy; + +import org.apache.commons.io.IOUtils; + +import com.mongodb.DB; +import com.mongodb.DBCollection; +import com.mongodb.DBObject; +import com.mongodb.InsertOptions; +import com.mongodb.MongoClient; +import com.mongodb.MongoCredential; +import com.mongodb.ServerAddress; + +import de.flapdoodle.embed.mongo.distribution.Version; +import de.flapdoodle.embed.mongo.tests.MongodForTestsFactory; + +public class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{ + + + private MongoDBRdfConfiguration conf; + private MongoClient mongoClient; + private DB db; + private DBCollection coll; + private MongoDBQueryEngine queryEngine; + private MongoDBStorageStrategy storageStrategy; + private MongoDBNamespaceManager nameSpaceManager; + private MongodForTestsFactory testsFactory; + + private List<RyaSecondaryIndexer> secondaryIndexers; + + public MongoDBRyaDAO(MongoDBRdfConfiguration conf) throws RyaDAOException{ + this.conf = conf; + init(); + } + + public void setConf(MongoDBRdfConfiguration conf) { + this.conf = conf; + } + + public MongoDBRdfConfiguration getConf() { + return conf; + } + + public void init() throws RyaDAOException { + try { + boolean useMongoTest = conf.getUseTestMongo(); + if (useMongoTest) { + testsFactory = MongodForTestsFactory.with(Version.Main.PRODUCTION); + mongoClient = testsFactory.newMongo(); + int port = mongoClient.getServerAddressList().get(0).getPort(); + conf.set(MongoDBRdfConfiguration.MONGO_INSTANCE_PORT, Integer.toString(port)); + } else { + ServerAddress server = new ServerAddress(conf.get(MongoDBRdfConfiguration.MONGO_INSTANCE), + Integer.valueOf(conf.get(MongoDBRdfConfiguration.MONGO_INSTANCE_PORT))); + if (conf.get(MongoDBRdfConfiguration.MONGO_USER) != null) { + MongoCredential cred = MongoCredential.createCredential( + conf.get(MongoDBRdfConfiguration.MONGO_USER), + conf.get(MongoDBRdfConfiguration.MONGO_USER_PASSWORD), + conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME).toCharArray()); + mongoClient = new MongoClient(server, Arrays.asList(cred)); + } else { + mongoClient = new MongoClient(server); + } + } + secondaryIndexers = conf.getAdditionalIndexers(); + for(RyaSecondaryIndexer index: secondaryIndexers) { + index.setConf(conf); + } + + db = mongoClient.getDB(conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME)); + coll = db.getCollection(conf.getTriplesCollectionName()); + nameSpaceManager = new SimpleMongoDBNamespaceManager(db.getCollection(conf.getNameSpacesCollectionName())); + queryEngine = new MongoDBQueryEngine(conf); + storageStrategy = new SimpleMongoDBStorageStrategy(); + storageStrategy.createIndices(coll); + + } catch (UnknownHostException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + } + + public boolean isInitialized() throws RyaDAOException { + return true; + } + + public void destroy() throws RyaDAOException { + if (mongoClient != null) { + mongoClient.close(); + } + if (conf.getUseTestMongo()) { + testsFactory.shutdown(); + } + + IOUtils.closeQuietly(queryEngine); + } + + public void add(RyaStatement statement) throws RyaDAOException { + // add it to the collection + try { + coll.insert(storageStrategy.serialize(statement)); + for(RyaSecondaryIndexer index: secondaryIndexers) { + index.storeStatement(statement); + } + } + catch (com.mongodb.MongoException.DuplicateKey exception){ + // ignore + } + catch (com.mongodb.DuplicateKeyException exception){ + // ignore + } + catch (Exception ex){ + // ignore single exceptions + ex.printStackTrace(); + } + } + + public void add(Iterator<RyaStatement> statement) throws RyaDAOException { + List<DBObject> dbInserts = new ArrayList<DBObject>(); + while (statement.hasNext()){ + RyaStatement ryaStatement = statement.next(); + DBObject insert = storageStrategy.serialize(ryaStatement); + dbInserts.add(insert); + + try { + for (RyaSecondaryIndexer index : secondaryIndexers) { + index.storeStatement(ryaStatement); + } + } catch (IOException e) { + throw new RyaDAOException(e); + } + + } + coll.insert(dbInserts, new InsertOptions().continueOnError(true)); + } + + public void delete(RyaStatement statement, MongoDBRdfConfiguration conf) + throws RyaDAOException { + DBObject obj = storageStrategy.serialize(statement); + coll.remove(obj); + } + + public void dropGraph(MongoDBRdfConfiguration conf, RyaURI... graphs) + throws RyaDAOException { + + } + + public void delete(Iterator<RyaStatement> statements, + MongoDBRdfConfiguration conf) throws RyaDAOException { + while (statements.hasNext()){ + RyaStatement ryaStatement = statements.next(); + coll.remove(storageStrategy.serialize(ryaStatement)); + } + + } + + public String getVersion() throws RyaDAOException { + return "1.0"; + } + + public RyaQueryEngine<MongoDBRdfConfiguration> getQueryEngine() { + return queryEngine; + } + + public RyaNamespaceManager<MongoDBRdfConfiguration> getNamespaceManager() { + return nameSpaceManager; + } + + public void purge(RdfCloudTripleStoreConfiguration configuration) { + // TODO Auto-generated method stub + + } + + public void dropAndDestroy() throws RyaDAOException { + db.dropDatabase(); // this is dangerous! + } + + +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/MongoDBNamespaceManager.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/MongoDBNamespaceManager.java b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/MongoDBNamespaceManager.java new file mode 100644 index 0000000..270b57f --- /dev/null +++ b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/MongoDBNamespaceManager.java @@ -0,0 +1,15 @@ +package mvm.rya.mongodb.dao; + +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.persist.RyaNamespaceManager; +import mvm.rya.api.persist.query.RyaQuery; +import mvm.rya.mongodb.MongoDBRdfConfiguration; + +import com.mongodb.DBCollection; +import com.mongodb.DBObject; + +public interface MongoDBNamespaceManager extends RyaNamespaceManager<MongoDBRdfConfiguration>{ + + public void createIndices(DBCollection coll); + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/MongoDBStorageStrategy.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/MongoDBStorageStrategy.java b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/MongoDBStorageStrategy.java new file mode 100644 index 0000000..093f2dd --- /dev/null +++ b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/MongoDBStorageStrategy.java @@ -0,0 +1,21 @@ +package mvm.rya.mongodb.dao; + +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.persist.query.RyaQuery; + +import com.mongodb.DBCollection; +import com.mongodb.DBObject; + +public interface MongoDBStorageStrategy { + + public DBObject getQuery(RyaStatement stmt); + + public RyaStatement deserializeDBObject(DBObject queryResult); + + public DBObject serialize(RyaStatement statement); + + public DBObject getQuery(RyaQuery ryaQuery); + + public void createIndices(DBCollection coll); + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/SimpleMongoDBNamespaceManager.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/SimpleMongoDBNamespaceManager.java b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/SimpleMongoDBNamespaceManager.java new file mode 100644 index 0000000..1847b94 --- /dev/null +++ b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/SimpleMongoDBNamespaceManager.java @@ -0,0 +1,169 @@ +package mvm.rya.mongodb.dao; + +import info.aduna.iteration.CloseableIteration; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.api.persist.RyaNamespaceManager; +import mvm.rya.api.persist.query.RyaQuery; +import mvm.rya.mongodb.MongoDBRdfConfiguration; + +import org.apache.commons.codec.binary.Hex; +import org.openrdf.model.Namespace; +import org.openrdf.model.impl.ValueFactoryImpl; + +import com.mongodb.BasicDBObject; +import com.mongodb.DBCollection; +import com.mongodb.DBCursor; +import com.mongodb.DBObject; + +public class SimpleMongoDBNamespaceManager implements MongoDBNamespaceManager { + + public class NamespaceImplementation implements Namespace { + + private String namespace; + private String prefix; + + public NamespaceImplementation(String namespace, String prefix) { + this.namespace = namespace; + this.prefix = prefix; + } + + @Override + public int compareTo(Namespace o) { + if (!namespace.equalsIgnoreCase(o.getName())) return namespace.compareTo(o.getName()); + if (!prefix.equalsIgnoreCase(o.getPrefix())) return prefix.compareTo(o.getPrefix()); + return 0; + } + + @Override + public String getName() { + return namespace; + } + + @Override + public String getPrefix() { + return prefix; + } + + } + + public class MongoCursorIteration implements + CloseableIteration<Namespace, RyaDAOException> { + private DBCursor cursor; + + public MongoCursorIteration(DBCursor cursor2) { + this.cursor = cursor2; + } + + @Override + public boolean hasNext() throws RyaDAOException { + return cursor.hasNext(); + } + + @Override + public Namespace next() throws RyaDAOException { + DBObject ns = cursor.next(); + Map values = ns.toMap(); + String namespace = (String) values.get(NAMESPACE); + String prefix = (String) values.get(PREFIX); + + Namespace temp = new NamespaceImplementation(namespace, prefix); + return temp; + } + + @Override + public void remove() throws RyaDAOException { + next(); + } + + @Override + public void close() throws RyaDAOException { + cursor.close(); + } + + } + + private static final String ID = "_id"; + private static final String PREFIX = "prefix"; + private static final String NAMESPACE = "namespace"; + private MongoDBRdfConfiguration conf; + private DBCollection nsColl; + + + public SimpleMongoDBNamespaceManager(DBCollection nameSpaceCollection) { + nsColl = nameSpaceCollection; + } + + @Override + public void createIndices(DBCollection coll){ + coll.createIndex(PREFIX); + coll.createIndex(NAMESPACE); + } + + + @Override + public void setConf(MongoDBRdfConfiguration paramC) { + this.conf = paramC; + } + + @Override + public MongoDBRdfConfiguration getConf() { + // TODO Auto-generated method stub + return conf; + } + + @Override + public void addNamespace(String prefix, String namespace) + throws RyaDAOException { + String id = prefix; + byte[] bytes = id.getBytes(); + try { + MessageDigest digest = MessageDigest.getInstance("SHA-1"); + bytes = digest.digest(bytes); + } catch (NoSuchAlgorithmException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + BasicDBObject doc = new BasicDBObject(ID, new String(Hex.encodeHex(bytes))) + .append(PREFIX, prefix) + .append(NAMESPACE, namespace); + nsColl.insert(doc); + + } + + @Override + public String getNamespace(String prefix) throws RyaDAOException { + DBObject query = new BasicDBObject().append(PREFIX, prefix); + DBCursor cursor = nsColl.find(query); + String nameSpace = prefix; + while (cursor.hasNext()){ + DBObject obj = cursor.next(); + nameSpace = (String) obj.toMap().get(NAMESPACE); + } + return nameSpace; + } + + @Override + public void removeNamespace(String prefix) throws RyaDAOException { + DBObject query = new BasicDBObject().append(PREFIX, prefix); + nsColl.remove(query); + } + + @Override + public CloseableIteration<? extends Namespace, RyaDAOException> iterateNamespace() + throws RyaDAOException { + DBObject query = new BasicDBObject(); + DBCursor cursor = nsColl.find(query); + return new MongoCursorIteration(cursor); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java new file mode 100644 index 0000000..6de5b89 --- /dev/null +++ b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java @@ -0,0 +1,132 @@ +package mvm.rya.mongodb.dao; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Map; + +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.persist.query.RyaQuery; + +import org.apache.commons.codec.binary.Hex; +import org.openrdf.model.impl.ValueFactoryImpl; + +import com.mongodb.BasicDBObject; +import com.mongodb.DBCollection; +import com.mongodb.DBObject; + +public class SimpleMongoDBStorageStrategy implements MongoDBStorageStrategy { + + private static final String ID = "_id"; + private static final String OBJECT_TYPE = "objectType"; + private static final String CONTEXT = "context"; + private static final String PREDICATE = "predicate"; + private static final String OBJECT = "object"; + private static final String SUBJECT = "subject"; + private ValueFactoryImpl factory = new ValueFactoryImpl(); + + + public SimpleMongoDBStorageStrategy() { + } + + @Override + public void createIndices(DBCollection coll){ + coll.createIndex("subject"); + coll.createIndex("predicate"); + BasicDBObject doc = new BasicDBObject(); + doc.put(SUBJECT, 1); + doc.put(PREDICATE, 1); + coll.createIndex(doc); + doc = new BasicDBObject(OBJECT, 1); + doc.put(OBJECT_TYPE, 1); + doc.put(PREDICATE, 1); + coll.createIndex(doc); + doc = new BasicDBObject(OBJECT, 1); + doc.put(OBJECT_TYPE, 1); + coll.createIndex(doc); + doc = new BasicDBObject(OBJECT, 1); + doc = new BasicDBObject(OBJECT_TYPE, 1); + doc.put(SUBJECT, 1); + coll.createIndex(doc); + } + + @Override + public DBObject getQuery(RyaStatement stmt) { + RyaURI subject = stmt.getSubject(); + RyaURI predicate = stmt.getPredicate(); + RyaType object = stmt.getObject(); + RyaURI context = stmt.getContext(); + BasicDBObject query = new BasicDBObject(); + if (subject != null){ + query.append(SUBJECT, subject.getData()); + } + if (object != null){ + query.append(OBJECT, object.getData()); + query.append(OBJECT_TYPE, object.getDataType().toString()); + } + if (predicate != null){ + query.append(PREDICATE, predicate.getData()); + } + if (context != null){ + query.append(CONTEXT, context.getData()); + } + + return query; + } + + @Override + public RyaStatement deserializeDBObject(DBObject queryResult) { + Map result = queryResult.toMap(); + String subject = (String) result.get(SUBJECT); + String object = (String) result.get(OBJECT); + String objectType = (String) result.get(OBJECT_TYPE); + String predicate = (String) result.get(PREDICATE); + String context = (String) result.get(CONTEXT); + RyaType objectRya = null; + if (objectType.equalsIgnoreCase("http://www.w3.org/2001/XMLSchema#anyURI")){ + objectRya = new RyaURI(object); + } + else { + objectRya = new RyaType(factory.createURI(objectType), object); + } + + if (!context.isEmpty()){ + return new RyaStatement(new RyaURI(subject), new RyaURI(predicate), objectRya, + new RyaURI(context)); + } + return new RyaStatement(new RyaURI(subject), new RyaURI(predicate), objectRya); + } + + @Override + public DBObject serialize(RyaStatement statement){ + String context = ""; + if (statement.getContext() != null){ + context = statement.getContext().getData(); + } + String id = statement.getSubject().getData() + " " + + statement.getPredicate().getData() + " " + statement.getObject().getData() + " " + context; + byte[] bytes = id.getBytes(); + try { + MessageDigest digest = MessageDigest.getInstance("SHA-1"); + bytes = digest.digest(bytes); + } catch (NoSuchAlgorithmException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + BasicDBObject doc = new BasicDBObject(ID, new String(Hex.encodeHex(bytes))) + .append(SUBJECT, statement.getSubject().getData()) + .append(PREDICATE, statement.getPredicate().getData()) + .append(OBJECT, statement.getObject().getData()) + .append(OBJECT_TYPE, statement.getObject().getDataType().toString()) + .append(CONTEXT, context); + return doc; + + } + + @Override + public DBObject getQuery(RyaQuery ryaQuery) { + return getQuery(ryaQuery.getQuery()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/NonCloseableRyaStatementCursorIterator.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/NonCloseableRyaStatementCursorIterator.java b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/NonCloseableRyaStatementCursorIterator.java new file mode 100644 index 0000000..48f0931 --- /dev/null +++ b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/NonCloseableRyaStatementCursorIterator.java @@ -0,0 +1,37 @@ +package mvm.rya.mongodb.iter; + +import java.util.Iterator; + +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.persist.RyaDAOException; + +public class NonCloseableRyaStatementCursorIterator implements Iterator<RyaStatement> { + + RyaStatementCursorIterator iterator; + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public RyaStatement next() { + return iterator.next(); + } + + public NonCloseableRyaStatementCursorIterator( + RyaStatementCursorIterator iterator) { + this.iterator = iterator; + } + + @Override + public void remove() { + try { + iterator.remove(); + } catch (RyaDAOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java new file mode 100644 index 0000000..b699d96 --- /dev/null +++ b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java @@ -0,0 +1,88 @@ +package mvm.rya.mongodb.iter; + +import info.aduna.iteration.CloseableIteration; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; + +import mvm.rya.api.RdfCloudTripleStoreUtils; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.mongodb.dao.MongoDBStorageStrategy; + +import org.openrdf.query.BindingSet; + +import com.mongodb.DBCollection; +import com.mongodb.DBCursor; +import com.mongodb.DBObject; + +public class RyaStatementBindingSetCursorIterator implements CloseableIteration<Entry<RyaStatement, BindingSet>, RyaDAOException> { + + private DBCollection coll; + private Map<DBObject, BindingSet> rangeMap; + private Iterator<DBObject> queryIterator; + private Long maxResults; + private DBCursor currentCursor; + private BindingSet currentBindingSet; + private MongoDBStorageStrategy strategy; + + public RyaStatementBindingSetCursorIterator(DBCollection coll, + Map<DBObject, BindingSet> rangeMap, MongoDBStorageStrategy strategy) { + this.coll = coll; + this.rangeMap = rangeMap; + this.queryIterator = rangeMap.keySet().iterator(); + this.strategy = strategy; + } + + @Override + public boolean hasNext() { + if (!currentCursorIsValid()) { + findNextValidCursor(); + } + return currentCursorIsValid(); + } + + @Override + public Entry<RyaStatement, BindingSet> next() { + if (!currentCursorIsValid()) { + findNextValidCursor(); + } + if (currentCursorIsValid()) { + // convert to Rya Statement + DBObject queryResult = currentCursor.next(); + RyaStatement statement = strategy.deserializeDBObject(queryResult); + return new RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(statement, currentBindingSet); + } + return null; + } + + private void findNextValidCursor() { + while (queryIterator.hasNext()){ + DBObject currentQuery = queryIterator.next(); + currentCursor = coll.find(currentQuery); + currentBindingSet = rangeMap.get(currentQuery); + if (currentCursor.hasNext()) break; + } + } + + private boolean currentCursorIsValid() { + return (currentCursor != null) && currentCursor.hasNext(); + } + + + public void setMaxResults(Long maxResults) { + this.maxResults = maxResults; + } + + @Override + public void close() throws RyaDAOException { + // TODO don't know what to do here + } + + @Override + public void remove() throws RyaDAOException { + next(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterable.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterable.java b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterable.java new file mode 100644 index 0000000..d69ab65 --- /dev/null +++ b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterable.java @@ -0,0 +1,47 @@ +package mvm.rya.mongodb.iter; + +import info.aduna.iteration.CloseableIteration; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.Set; + +import mvm.rya.api.RdfCloudTripleStoreUtils; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.persist.RyaDAOException; + +import org.calrissian.mango.collect.CloseableIterable; +import org.calrissian.mango.collect.CloseableIterator; +import org.openrdf.query.BindingSet; + +import com.mongodb.DBCollection; +import com.mongodb.DBCursor; +import com.mongodb.DBObject; + +public class RyaStatementCursorIterable implements CloseableIterable<RyaStatement> { + + + private NonCloseableRyaStatementCursorIterator iterator; + + public RyaStatementCursorIterable(NonCloseableRyaStatementCursorIterator iterator) { + this.iterator = iterator; + } + + @Override + public Iterator<RyaStatement> iterator() { + // TODO Auto-generated method stub + return iterator; + } + + @Override + public void closeQuietly() { + //TODO don't know what to do here + } + + @Override + public void close() throws IOException { + // TODO Auto-generated method stub + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterator.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterator.java b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterator.java new file mode 100644 index 0000000..8b2ae3b --- /dev/null +++ b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterator.java @@ -0,0 +1,84 @@ +package mvm.rya.mongodb.iter; + +import info.aduna.iteration.CloseableIteration; + +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.Set; + +import mvm.rya.api.RdfCloudTripleStoreUtils; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.mongodb.dao.MongoDBStorageStrategy; + +import org.calrissian.mango.collect.CloseableIterable; +import org.openrdf.query.BindingSet; + +import com.mongodb.DBCollection; +import com.mongodb.DBCursor; +import com.mongodb.DBObject; + +public class RyaStatementCursorIterator implements CloseableIteration<RyaStatement, RyaDAOException> { + + private DBCollection coll; + private Iterator<DBObject> queryIterator; + private DBCursor currentCursor; + private MongoDBStorageStrategy strategy; + private Long maxResults; + + public RyaStatementCursorIterator(DBCollection coll, Set<DBObject> queries, MongoDBStorageStrategy strategy) { + this.coll = coll; + this.queryIterator = queries.iterator(); + this.strategy = strategy; + } + + @Override + public boolean hasNext() { + if (!currentCursorIsValid()) { + findNextValidCursor(); + } + return currentCursorIsValid(); + } + + @Override + public RyaStatement next() { + if (!currentCursorIsValid()) { + findNextValidCursor(); + } + if (currentCursorIsValid()) { + // convert to Rya Statement + DBObject queryResult = currentCursor.next(); + RyaStatement statement = strategy.deserializeDBObject(queryResult); + return statement; + } + return null; + } + + private void findNextValidCursor() { + while (queryIterator.hasNext()){ + DBObject currentQuery = queryIterator.next(); + currentCursor = coll.find(currentQuery); + if (currentCursor.hasNext()) break; + } + } + + private boolean currentCursorIsValid() { + return (currentCursor != null) && currentCursor.hasNext(); + } + + + public void setMaxResults(Long maxResults) { + this.maxResults = maxResults; + } + + @Override + public void close() throws RyaDAOException { + // TODO don't know what to do here + } + + @Override + public void remove() throws RyaDAOException { + next(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/pom.xml ---------------------------------------------------------------------- diff --git a/dao/pom.xml b/dao/pom.xml new file mode 100644 index 0000000..8794f4a --- /dev/null +++ b/dao/pom.xml @@ -0,0 +1,17 @@ +<?xml version="1.0" encoding="utf-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>mvm.rya</groupId> + <artifactId>parent</artifactId> + <version>3.2.9</version> + </parent> + <artifactId>rya.dao</artifactId> + <packaging>pom</packaging> + <name>${project.groupId}.${project.artifactId}</name> + <modules> + <module>accumulo.rya</module> + <module>mongodb.rya</module> + </modules> +</project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/cloudbase.rya.giraph/pom.xml ---------------------------------------------------------------------- diff --git a/extras/cloudbase.rya.giraph/pom.xml b/extras/cloudbase.rya.giraph/pom.xml new file mode 100644 index 0000000..c894ea2 --- /dev/null +++ b/extras/cloudbase.rya.giraph/pom.xml @@ -0,0 +1,69 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>mvm.rya</groupId> + <artifactId>rya.extras</artifactId> + <version>3.0.4-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>cloudbase.rya.giraph</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.giraph</groupId> + <artifactId>giraph</artifactId> + <version>0.2-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + <dependency> + <groupId>cloudbase</groupId> + <artifactId>cloudbase-core</artifactId> + </dependency> + <dependency> + <groupId>mvm.rya</groupId> + <artifactId>cloudbase.rya</artifactId> + </dependency> + </dependencies> + <profiles> + <profile> + <id>mr</id> + <build> + <plugins> + <plugin> + <!-- NOTE: We don't need a groupId specification because the group is + org.apache.maven.plugins ...which is assumed by default. --> + <artifactId>maven-assembly-plugin</artifactId> + <dependencies> + <dependency> + <groupId>mvm.cloud</groupId> + <artifactId>hadoop-job-assembly</artifactId> + <version>1.0.0-SNAPSHOT</version> + </dependency> + </dependencies> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + <configuration> + <attach>false</attach> + <descriptors> + <descriptor>assemblies/job.xml</descriptor> + </descriptors> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseRyaVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseRyaVertexInputFormat.java b/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseRyaVertexInputFormat.java new file mode 100644 index 0000000..490b64d --- /dev/null +++ b/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseRyaVertexInputFormat.java @@ -0,0 +1,88 @@ +package mvm.rya.cloudbase.giraph.format; + +import cloudbase.core.data.Key; +import cloudbase.core.data.Value; +import com.google.common.collect.Maps; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.resolver.RyaContext; +import mvm.rya.api.resolver.triple.TripleRow; +import org.apache.giraph.graph.BspUtils; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.graph.VertexReader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.util.Map; + +/** + * Date: 7/27/12 + * Time: 1:39 PM + */ +public class CloudbaseRyaVertexInputFormat + extends CloudbaseVertexInputFormat<Text, Text, Text, Text> { + + private Configuration conf; + + public VertexReader<Text, Text, Text, Text> + createVertexReader(InputSplit split, TaskAttemptContext context) + throws IOException { + try { + + return new CloudbaseEdgeVertexReader( + cloudbaseInputFormat.createRecordReader(split, context)) { + }; + } catch (InterruptedException e) { + throw new IOException(e); + } + + } + + /* + Reader takes Key/Value pairs from the underlying input format. + */ + public static class CloudbaseEdgeVertexReader + extends CloudbaseVertexReader<Text, Text, Text, Text> { + + private RyaContext ryaContext = RyaContext.getInstance(); + + public CloudbaseEdgeVertexReader(RecordReader<Key, Value> recordReader) { + super(recordReader); + } + + + public boolean nextVertex() throws IOException, InterruptedException { + return getRecordReader().nextKeyValue(); + } + + /* + Each Key/Value contains the information needed to construct the vertices. + */ + public Vertex<Text, Text, Text, Text> getCurrentVertex() + throws IOException, InterruptedException { + try { + Key key = getRecordReader().getCurrentKey(); + Value value = getRecordReader().getCurrentValue(); + RyaStatement ryaStatement = ryaContext.deserializeTriple(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO, + new TripleRow(key.getRow().getBytes(), key.getColumnFamily().getBytes(), + key.getColumnQualifier().getBytes()));//TODO: assume spo for now + Vertex<Text, Text, Text, Text> vertex = + BspUtils.<Text, Text, Text, Text>createVertex( + getContext().getConfiguration()); + Text vertexId = new Text(ryaStatement.getSubject().getData()); //TODO: set Text? + Map<Text, Text> edges = Maps.newHashMap(); + Text edgeId = new Text(ryaStatement.getPredicate().getData()); + edges.put(edgeId, new Text(ryaStatement.getObject().getData())); + vertex.initialize(vertexId, new Text(), edges, null); + + return vertex; + } catch (Exception e) { + throw new IOException(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseRyaVertexOutputFormat.java ---------------------------------------------------------------------- diff --git a/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseRyaVertexOutputFormat.java b/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseRyaVertexOutputFormat.java new file mode 100644 index 0000000..acdbe51 --- /dev/null +++ b/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseRyaVertexOutputFormat.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package mvm.rya.cloudbase.giraph.format; + +import cloudbase.core.data.Mutation; +import cloudbase.core.data.Value; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.cloudbase.RyaTableMutationsFactory; +import org.apache.giraph.graph.Edge; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.graph.VertexWriter; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; + +/* + Example subclass for writing vertices back to Cloudbase. + */ +public class CloudbaseRyaVertexOutputFormat + extends CloudbaseVertexOutputFormat<Text, Text, Text> { + + public VertexWriter<Text, Text, Text> + createVertexWriter(TaskAttemptContext context) + throws IOException, InterruptedException { + RecordWriter<Text, Mutation> writer = + cloudbaseOutputFormat.getRecordWriter(context); + String tableName = context.getConfiguration().get(OUTPUT_TABLE); + if (tableName == null) + throw new IOException("Forgot to set table name " + + "using CloudbaseVertexOutputFormat.OUTPUT_TABLE"); + return new CloudbaseEdgeVertexWriter(writer, tableName); + } + + /* + Wraps RecordWriter for writing Mutations back to the configured Cloudbase Table. + */ + public static class CloudbaseEdgeVertexWriter + extends CloudbaseVertexWriter<Text, Text, Text> { + + public static final RyaTableMutationsFactory RYA_TABLE_MUTATIONS_FACTORY = new RyaTableMutationsFactory(); + private final Text CF = new Text("cf"); + private final Text PARENT = new Text("parent"); + private Text tableName; + + public CloudbaseEdgeVertexWriter( + RecordWriter<Text, Mutation> writer, String tableName) { + super(writer); + this.tableName = new Text(tableName); + } + + /* + Write back a mutation that adds a qualifier for 'parent' containing the vertex value + as the cell value. Assume the vertex ID corresponds to a key. + */ + public void writeVertex(Vertex<Text, Text, Text, ?> vertex) + throws IOException, InterruptedException { + RecordWriter<Text, Mutation> writer = getRecordWriter(); + Text subj = vertex.getId(); + Iterable<Edge<Text, Text>> edges = vertex.getEdges(); + for (Edge<Text, Text> edge : edges) { + Text pred = edge.getTargetVertexId(); + Text obj = edge.getValue(); + Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> serialize = + RYA_TABLE_MUTATIONS_FACTORY.serialize(new RyaURI(subj.toString()), + new RyaURI(pred.toString()), new RyaType(obj.toString()), null); + Collection<Mutation> mutations = serialize.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO); + for (Mutation mut : mutations) { + writer.write(tableName, mut); //TODO: Assuming SPO + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseVertexInputFormat.java b/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseVertexInputFormat.java new file mode 100644 index 0000000..fcc0c5e --- /dev/null +++ b/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseVertexInputFormat.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package mvm.rya.cloudbase.giraph.format; + +import cloudbase.core.client.mapreduce.CloudbaseInputFormat; +import cloudbase.core.data.Key; +import cloudbase.core.data.Value; +import org.apache.giraph.graph.VertexInputFormat; +import org.apache.giraph.graph.VertexReader; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.util.List; + +/** + * Class which wraps the CloudbaseInputFormat. It's designed + * as an extension point to VertexInputFormat subclasses who wish + * to read from Cloudbase Tables. + * + * @param <I> vertex id type + * @param <V> vertex value type + * @param <E> edge type + * @param <M> message type + */ +public abstract class CloudbaseVertexInputFormat< + I extends WritableComparable, + V extends Writable, + E extends Writable, + M extends Writable> + extends VertexInputFormat<I, V, E, M> implements Configurable { + /** + * delegate input format for all cloudbase operations. + */ + protected CloudbaseInputFormat cloudbaseInputFormat = + new CloudbaseInputFormat(); + + /** + * Configured and injected by the job + */ + private Configuration conf; + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + /** + * Abstract class which provides a template for instantiating vertices + * from Cloudbase Key/Value pairs. + * + * @param <I> vertex id type + * @param <V> vertex value type + * @param <E> edge type + * @param <M> message type + */ + public abstract static class CloudbaseVertexReader< + I extends WritableComparable, + V extends Writable, E extends Writable, M extends Writable> + implements VertexReader<I, V, E, M> { + + /** + * Used by subclasses to read key/value pairs. + */ + private final RecordReader<Key, Value> reader; + /** + * Context passed to initialize + */ + private TaskAttemptContext context; + + /** + * Constructor used to pass Record Reader instance + * + * @param reader Cloudbase record reader + */ + public CloudbaseVertexReader(RecordReader<Key, Value> reader) { + this.reader = reader; + } + + @Override + public void initialize(InputSplit inputSplit, + TaskAttemptContext context) + throws IOException, InterruptedException { + reader.initialize(inputSplit, context); + this.context = context; + } + + /** + * close + * + * @throws IOException + */ + public void close() throws IOException { + reader.close(); + } + + /** + * getProgress + * + * @return progress + * @throws IOException + * @throws InterruptedException + */ + public float getProgress() throws IOException, InterruptedException { + return reader.getProgress(); + } + + /** + * Get the result record reader + * + * @return Record reader to be used for reading. + */ + protected RecordReader<Key, Value> getRecordReader() { + return reader; + } + + /** + * getContext + * + * @return Context passed to initialize. + */ + protected TaskAttemptContext getContext() { + return context; + } + + } + + /** + * getSplits + * + * @param context Context of the job + * @param numWorkers Number of workers used for this job + * @return tablet splits + * @throws IOException + * @throws InterruptedException + */ + public List<InputSplit> getSplits( + JobContext context, int numWorkers) + throws IOException, InterruptedException { + List<InputSplit> splits = null; + try { + splits = cloudbaseInputFormat.getSplits(context); + } catch (IOException e) { + if (e.getMessage().contains("Input info has not been set")) { + throw new IOException(e.getMessage() + + " Make sure you initialized" + + " CloudbaseInputFormat static setters " + + "before passing the config to GiraphJob."); + } + } + return splits; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseVertexOutputFormat.java ---------------------------------------------------------------------- diff --git a/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseVertexOutputFormat.java b/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseVertexOutputFormat.java new file mode 100644 index 0000000..f88dfe6 --- /dev/null +++ b/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/CloudbaseVertexOutputFormat.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package mvm.rya.cloudbase.giraph.format; + +import cloudbase.core.client.mapreduce.CloudbaseOutputFormat; +import cloudbase.core.data.Mutation; +import org.apache.giraph.graph.VertexOutputFormat; +import org.apache.giraph.graph.VertexWriter; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +/** + * + * Class which wraps the CloudbaseOutputFormat. It's designed + * as an extension point to VertexOutputFormat subclasses who wish + * to write vertices back to an Cloudbase table. + * + * + * @param <I> vertex id type + * @param <V> vertex value type + * @param <E> edge type + */ +public abstract class CloudbaseVertexOutputFormat< + I extends WritableComparable, + V extends Writable, + E extends Writable> + extends VertexOutputFormat<I, V, E> implements Configurable { + + + /** + * Output table parameter + */ + protected static final String OUTPUT_TABLE = "OUTPUT_TABLE"; + + /** + * Cloudbase delegate for table output + */ + protected CloudbaseOutputFormat cloudbaseOutputFormat = + new CloudbaseOutputFormat(); + + + /** + * Used by configured interface + */ + private Configuration conf; + + /** + * + * Main abstraction point for vertex writers to persist back + * to Cloudbase tables. + * + * @param <I> vertex id type + * @param <V> vertex value type + * @param <E> edge type + */ + public abstract static class CloudbaseVertexWriter< + I extends WritableComparable, + V extends Writable, + E extends Writable> + implements VertexWriter<I, V, E> { + + /** + * task attempt context. + */ + private TaskAttemptContext context; + + /** + * Cloudbase record writer + */ + private RecordWriter<Text, Mutation> recordWriter; + + /** + * Constructor for use with subclasses + * + * @param recordWriter cloudbase record writer + */ + public CloudbaseVertexWriter(RecordWriter<Text, Mutation> recordWriter) { + this.recordWriter = recordWriter; + } + + /** + * initialize + * + * @param context Context used to write the vertices. + * @throws IOException + */ + public void initialize(TaskAttemptContext context) throws IOException { + this.context = context; + } + + /** + * close + * + * @param context the context of the task + * @throws IOException + * @throws InterruptedException + */ + public void close(TaskAttemptContext context) + throws IOException, InterruptedException { + recordWriter.close(context); + } + + /** + * Get the table record writer; + * + * @return Record writer to be used for writing. + */ + public RecordWriter<Text, Mutation> getRecordWriter() { + return recordWriter; + } + + /** + * Get the context. + * + * @return Context passed to initialize. + */ + public TaskAttemptContext getContext() { + return context; + } + + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + return this.conf; + } + + /** + * + * checkOutputSpecs + * + * @param context information about the job + * @throws IOException + * @throws InterruptedException + */ + public void checkOutputSpecs(JobContext context) + throws IOException, InterruptedException { + try { + cloudbaseOutputFormat.checkOutputSpecs(context); + } catch (IOException e) { + if (e.getMessage().contains("Output info has not been set")) { + throw new IOException(e.getMessage() + " Make sure you initialized" + + " CloudbaseOutputFormat static setters " + + "before passing the config to GiraphJob."); + } + } + } + + /** + * getOutputCommitter + * + * @param context the task context + * @return OutputCommitter + * @throws IOException + * @throws InterruptedException + */ + public OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException, InterruptedException { + return cloudbaseOutputFormat.getOutputCommitter(context); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/PrintVertexOutputFormat.java ---------------------------------------------------------------------- diff --git a/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/PrintVertexOutputFormat.java b/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/PrintVertexOutputFormat.java new file mode 100644 index 0000000..e90ca66 --- /dev/null +++ b/extras/cloudbase.rya.giraph/src/main/java/mvm/rya/cloudbase/giraph/format/PrintVertexOutputFormat.java @@ -0,0 +1,94 @@ +package mvm.rya.cloudbase.giraph.format; + +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.graph.VertexOutputFormat; +import org.apache.giraph.graph.VertexWriter; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +/** + * Date: 7/27/12 + * Time: 2:58 PM + */ +public class PrintVertexOutputFormat< + I extends WritableComparable, + V extends Writable, + E extends Writable> + extends VertexOutputFormat<I, V, E> implements Configurable { + @Override + public void setConf(Configuration entries) { + //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public Configuration getConf() { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public VertexWriter<I, V, E> createVertexWriter(TaskAttemptContext context) throws IOException, InterruptedException { + return new VertexWriter<I, V, E>() { + @Override + public void initialize(TaskAttemptContext context) throws IOException { + //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public void writeVertex(Vertex<I, V, E, ?> iveVertex) throws IOException, InterruptedException { + System.out.println(iveVertex); + } + + @Override + public void close(TaskAttemptContext context) throws IOException, InterruptedException { + //To change body of implemented methods use File | Settings | File Templates. + } + }; + } + + @Override + public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { + //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { + return new OutputCommitter() { + @Override + public void setupJob(JobContext jobContext) throws IOException { + //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public void cleanupJob(JobContext jobContext) throws IOException { + //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException { + //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException { + //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException { + //To change body of implemented methods use File | Settings | File Templates. + } + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/cloudbase.rya.giraph/src/test/java/mvm/rya/cloudbase/giraph/format/BspCase.java ---------------------------------------------------------------------- diff --git a/extras/cloudbase.rya.giraph/src/test/java/mvm/rya/cloudbase/giraph/format/BspCase.java b/extras/cloudbase.rya.giraph/src/test/java/mvm/rya/cloudbase/giraph/format/BspCase.java new file mode 100644 index 0000000..fb20dd8 --- /dev/null +++ b/extras/cloudbase.rya.giraph/src/test/java/mvm/rya/cloudbase/giraph/format/BspCase.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mvm.rya.cloudbase.giraph.format; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; + +import org.apache.giraph.examples.GeneratedVertexReader; +import org.apache.giraph.graph.GiraphJob; +import org.apache.giraph.zk.ZooKeeperExt; + +import junit.framework.TestCase; + +/** + * Duplicate copy from main giraph trunk. At least until there + * is a maven test artifact for Giraph. + * + * Extended TestCase for making setting up Bsp testing. + */ +public class BspCase extends TestCase implements Watcher { + /** JobTracker system property */ + private final String jobTracker = + System.getProperty("prop.mapred.job.tracker"); + /** Jar location system property */ + private final String jarLocation = + System.getProperty("prop.jarLocation", ""); + /** Number of actual processes for the BSP application */ + private int numWorkers = 1; + /** ZooKeeper list system property */ + private final String zkList = System.getProperty("prop.zookeeper.list"); + + /** + * Adjust the configuration to the basic test case + */ + public final void setupConfiguration(GiraphJob job) { + Configuration conf = job.getConfiguration(); + conf.set("mapred.jar", getJarLocation()); + + // Allow this test to be run on a real Hadoop setup + if (getJobTracker() != null) { + System.out.println("setup: Sending job to job tracker " + + getJobTracker() + " with jar path " + getJarLocation() + + " for " + getName()); + conf.set("mapred.job.tracker", getJobTracker()); + job.setWorkerConfiguration(getNumWorkers(), + getNumWorkers(), + 100.0f); + } + else { + System.out.println("setup: Using local job runner with " + + "location " + getJarLocation() + " for " + + getName()); + job.setWorkerConfiguration(1, 1, 100.0f); + // Single node testing + conf.setBoolean(GiraphJob.SPLIT_MASTER_WORKER, false); + } + conf.setInt(GiraphJob.POLL_ATTEMPTS, 10); + conf.setInt(GiraphJob.POLL_MSECS, 3*1000); + conf.setInt(GiraphJob.ZOOKEEPER_SERVERLIST_POLL_MSECS, 500); + if (getZooKeeperList() != null) { + job.setZooKeeperConfiguration(getZooKeeperList()); + } + // GeneratedInputSplit will generate 5 vertices + conf.setLong(GeneratedVertexReader.READER_VERTICES, 5); + } + + /** + * Create the test case + * + * @param testName name of the test case + */ + public BspCase(String testName) { + super(testName); + + } + + /** + * Get the number of workers used in the BSP application + * + */ + public int getNumWorkers() { + return numWorkers; + } + + /** + * Get the ZooKeeper list + */ + public String getZooKeeperList() { + return zkList; + } + + /** + * Get the jar location + * + * @return location of the jar file + */ + String getJarLocation() { + return jarLocation; + } + + /** + * Get the job tracker location + * + * @return job tracker location as a string + */ + String getJobTracker() { + return jobTracker; + } + + /** + * Get the single part file status and make sure there is only one part + * + * @param job Job to get the file system from + * @param partDirPath Directory where the single part file should exist + * @return Single part file status + * @throws java.io.IOException + */ + public static FileStatus getSinglePartFileStatus(GiraphJob job, + Path partDirPath) throws IOException { + FileSystem fs = FileSystem.get(job.getConfiguration()); + FileStatus[] statusArray = fs.listStatus(partDirPath); + FileStatus singlePartFileStatus = null; + int partFiles = 0; + for (FileStatus fileStatus : statusArray) { + if (fileStatus.getPath().getName().equals("part-m-00000")) { + singlePartFileStatus = fileStatus; + } + if (fileStatus.getPath().getName().startsWith("part-m-")) { + ++partFiles; + } + } + if (partFiles != 1) { + throw new ArithmeticException( + "getSinglePartFile: Part file count should be 1, but is " + + partFiles); + } + return singlePartFileStatus; + } + + @Override + public void setUp() { + if (jobTracker != null) { + System.out.println("Setting tasks to 3 for " + getName() + + " since JobTracker exists..."); + numWorkers = 3; + } + try { + Configuration conf = new Configuration(); + FileSystem hdfs = FileSystem.get(conf); + // Since local jobs always use the same paths, remove them + Path oldLocalJobPaths = new Path( + GiraphJob.ZOOKEEPER_MANAGER_DIR_DEFAULT); + FileStatus[] fileStatusArr; + try { + fileStatusArr = hdfs.listStatus(oldLocalJobPaths); + for (FileStatus fileStatus : fileStatusArr) { + if (fileStatus.isDir() && + fileStatus.getPath().getName().contains("job_local")) { + System.out.println("Cleaning up local job path " + + fileStatus.getPath().getName()); + hdfs.delete(oldLocalJobPaths, true); + } + } + } catch (FileNotFoundException e) { + // ignore this FileNotFound exception and continue. + } + if (zkList == null) { + return; + } + ZooKeeperExt zooKeeperExt = + new ZooKeeperExt(zkList, 30*1000, this); + List<String> rootChildren = zooKeeperExt.getChildren("/", false); + for (String rootChild : rootChildren) { + if (rootChild.startsWith("_hadoopBsp")) { + List<String> children = + zooKeeperExt.getChildren("/" + rootChild, false); + for (String child: children) { + if (child.contains("job_local_")) { + System.out.println("Cleaning up /_hadoopBsp/" + + child); + zooKeeperExt.deleteExt( + "/_hadoopBsp/" + child, -1, true); + } + } + } + } + zooKeeperExt.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void process(WatchedEvent event) { + // Do nothing + } + + /** + * Helper method to remove an old output directory if it exists, + * and set the output path for any VertexOutputFormat that uses + * FileOutputFormat. + * + * @param job Job to set the output path for + * @throws java.io.IOException + */ + public static void removeAndSetOutput(GiraphJob job, + Path outputPath) throws IOException { + remove(job.getConfiguration(), outputPath); + FileOutputFormat.setOutputPath(job.getInternalJob(), outputPath); + } + + /** + * Helper method to remove a path if it exists. + * + * @param conf Configuration to load FileSystem from + * @param path Path to remove + * @throws java.io.IOException + */ + public static void remove(Configuration conf, Path path) + throws IOException { + FileSystem hdfs = FileSystem.get(conf); + hdfs.delete(path, true); + } + + public static String getCallingMethodName() { + return Thread.currentThread().getStackTrace()[2].getMethodName(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/cloudbase.rya.giraph/src/test/java/mvm/rya/cloudbase/giraph/format/TestCloudbaseVertexFormat.java ---------------------------------------------------------------------- diff --git a/extras/cloudbase.rya.giraph/src/test/java/mvm/rya/cloudbase/giraph/format/TestCloudbaseVertexFormat.java b/extras/cloudbase.rya.giraph/src/test/java/mvm/rya/cloudbase/giraph/format/TestCloudbaseVertexFormat.java new file mode 100644 index 0000000..e420ff6 --- /dev/null +++ b/extras/cloudbase.rya.giraph/src/test/java/mvm/rya/cloudbase/giraph/format/TestCloudbaseVertexFormat.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mvm.rya.cloudbase.giraph.format; + +import cloudbase.core.client.BatchWriter; +import cloudbase.core.client.Connector; +import cloudbase.core.client.ZooKeeperInstance; +import cloudbase.core.client.mapreduce.CloudbaseInputFormat; +import cloudbase.core.client.mock.MockInstance; +import cloudbase.core.data.Range; +import cloudbase.core.security.Authorizations; +import junit.framework.Test; +import junit.framework.TestSuite; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.cloudbase.CloudbaseRdfConfiguration; +import mvm.rya.cloudbase.CloudbaseRyaDAO; +import org.apache.giraph.graph.EdgeListVertex; +import org.apache.giraph.graph.GiraphJob; +import org.apache.giraph.lib.TextVertexOutputFormat; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.log4j.Logger; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; + +/* + Test class for Cloudbase vertex input/output formats. + */ +public class TestCloudbaseVertexFormat extends BspCase { + + private final String TABLE_NAME = "rya_spo"; + private final String INSTANCE_NAME = "stratus"; + private final Text FAMILY = new Text("cf"); + private final Text CHILDREN = new Text("children"); + private final String USER = "root"; + private final byte[] PASSWORD = new byte[]{}; + private final Text OUTPUT_FIELD = new Text("parent"); + + + private final Logger log = Logger.getLogger(TestCloudbaseVertexFormat.class); + + /** + * Create the test case + * + * @param testName name of the test case + */ + public TestCloudbaseVertexFormat(String testName) { + super(testName); + } + + /** + * @return the suite of tests being tested + */ + public static Test suite() { + return new TestSuite(TestCloudbaseVertexFormat.class); + + } + + /* + Write a simple parent-child directed graph to Cloudbase. + Run a job which reads the values + into subclasses that extend CloudbaseVertex I/O formats. + Check the output after the job. + */ + public void testCloudbaseInputOutput() throws Exception { +// if (System.getProperty("prop.mapred.job.tracker") != null) { +// if(log.isInfoEnabled()) +// log.info("testCloudbaseInputOutput: " + +// "Ignore this test if not local mode."); +// return; +// } +// +// System.setProperty("prop.jarLocation", "/temp/cloudbase.rya.giraph-3.0.0-SNAPSHOT.jar"); + File jarTest = new File(System.getProperty("prop.jarLocation")); + if (!jarTest.exists()) { + fail("Could not find Giraph jar at " + + "location specified by 'prop.jarLocation'. " + + "Make sure you built the main Giraph artifact?."); + } + + //Write out vertices and edges out to a mock instance. +// MockInstance mockInstance = new MockInstance(INSTANCE_NAME); + Connector c = new ZooKeeperInstance("stratus", "stratus13:2181").getConnector("root", "password".getBytes()); + CloudbaseRyaDAO ryaDAO = new CloudbaseRyaDAO(); + ryaDAO.setConnector(c); + CloudbaseRdfConfiguration cloudbaseRdfConfiguration = new CloudbaseRdfConfiguration(); +// cloudbaseRdfConfiguration.setTablePrefix("test_"); + ryaDAO.init(); +// c.tableOperations().create(TABLE_NAME); +// BatchWriter bw = c.createBatchWriter(TABLE_NAME, 10000L, 1000L, 4); + + ryaDAO.add(new RyaStatement(new RyaURI("urn:test#1234"), + new RyaURI("urn:test#pred1"), + new RyaURI("urn:test#obj1"))); + ryaDAO.add(new RyaStatement(new RyaURI("urn:test#1234"), + new RyaURI("urn:test#pred2"), + new RyaURI("urn:test#obj2"))); + ryaDAO.add(new RyaStatement(new RyaURI("urn:test#1234"), + new RyaURI("urn:test#pred3"), + new RyaURI("urn:test#obj3"))); + ryaDAO.add(new RyaStatement(new RyaURI("urn:test#1234"), + new RyaURI("urn:test#pred4"), + new RyaURI("urn:test#obj4"))); + ryaDAO.commit(); + +// Mutation m1 = new Mutation(new Text("0001")); +// m1.put(FAMILY, CHILDREN, new Value("0002".getBytes())); +// bw.addMutation(m1); +// +// Mutation m2 = new Mutation(new Text("0002")); +// m2.put(FAMILY, CHILDREN, new Value("0003".getBytes())); +// bw.addMutation(m2); +// if(log.isInfoEnabled()) +// log.info("Writing mutations to Cloudbase table"); +// bw.close(); + + Configuration conf = new Configuration(); +// conf.set(CloudbaseVertexOutputFormat.OUTPUT_TABLE, TABLE_NAME); + + /* + Very important to initialize the formats before + sending configuration to the GiraphJob. Otherwise + the internally constructed Job in GiraphJob will + not have the proper context initialization. + */ + GiraphJob job = new GiraphJob(conf, getCallingMethodName()); + CloudbaseInputFormat.setInputInfo(job.getInternalJob(), USER, "password".getBytes(), + TABLE_NAME, new Authorizations()); +// CloudbaseInputFormat.setMockInstance(job.getInternalJob(), INSTANCE_NAME); + CloudbaseInputFormat.setZooKeeperInstance(job.getInternalJob(), "stratus", "stratus13:2181"); + CloudbaseInputFormat.setRanges(job.getInternalJob(), Collections.singleton(new Range())); + +// CloudbaseOutputFormat.setOutputInfo(job.getInternalJob(), USER, PASSWORD, true, null); +// CloudbaseOutputFormat.setMockInstance(job.getInternalJob(), INSTANCE_NAME); + + setupConfiguration(job); + job.setVertexClass(EdgeNotification.class); + job.setVertexInputFormatClass(CloudbaseRyaVertexInputFormat.class); + job.setVertexOutputFormatClass(PrintVertexOutputFormat.class); + FileOutputFormat.setOutputPath(job.getInternalJob(), new Path("/temp/graphout")); + +// HashSet<Pair<Text, Text>> columnsToFetch = new HashSet<Pair<Text,Text>>(); +// columnsToFetch.add(new Pair<Text, Text>(FAMILY, CHILDREN)); +// CloudbaseInputFormat.fetchColumns(job.getInternalJob(), columnsToFetch); + + if (log.isInfoEnabled()) + log.info("Running edge notification job using Cloudbase input"); + assertTrue(job.run(true)); + +// Scanner scanner = c.createScanner(TABLE_NAME, new Authorizations()); +// scanner.setRange(new Range("0002", "0002")); +// scanner.fetchColumn(FAMILY, OUTPUT_FIELD); +// boolean foundColumn = false; +// +// if(log.isInfoEnabled()) +// log.info("Verify job output persisted correctly."); +// //make sure we found the qualifier. +// assertTrue(scanner.iterator().hasNext()); +// +// +// //now we check to make sure the expected value from the job persisted correctly. +// for(Map.Entry<Key,Value> entry : scanner) { +// Text row = entry.getKey().getRow(); +// assertEquals("0002", row.toString()); +// Value value = entry.getValue(); +// assertEquals("0001", ByteBufferUtil.toString( +// ByteBuffer.wrap(value.get()))); +// foundColumn = true; +// } + } + + /* + Test compute method that sends each edge a notification of its parents. + The test set only has a 1-1 parent-to-child ratio for this unit test. + */ + public static class EdgeNotification + extends EdgeListVertex<Text, Text, Text, Text> { + @Override + public void compute(Iterable<Text> messages) throws IOException { + System.out.println("Edges: " + messages); + for (Text message : messages) { + getValue().set(message); + } + if (getSuperstep() == 0) { +// sendMessageToAllEdges(getId()); + } + voteToHalt(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/generic.mr/generic.mr.accumulo/pom.xml ---------------------------------------------------------------------- diff --git a/extras/generic.mr/generic.mr.accumulo/pom.xml b/extras/generic.mr/generic.mr.accumulo/pom.xml new file mode 100644 index 0000000..bc03fef --- /dev/null +++ b/extras/generic.mr/generic.mr.accumulo/pom.xml @@ -0,0 +1,58 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>mvm.rya</groupId> + <artifactId>generic.mr</artifactId> + <version>3.2.5-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>generic.mr.accumulo</artifactId> + <name>${project.groupId}.${project.artifactId}</name> + <dependencies> + <dependency> + <groupId>mvm.rya</groupId> + <artifactId>generic.mr.api</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.codehaus.gmaven</groupId> + <artifactId>gmaven-plugin</artifactId> + </plugin> + </plugins> + </build> + + <profiles> + <profile> + <id>accumulo</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-core</artifactId> + <optional>true</optional> + </dependency> + </dependencies> + </profile> + <profile> + <id>cloudbase</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <dependencies> + <dependency> + <groupId>com.texeltek</groupId> + <artifactId>accumulo-cloudbase-shim</artifactId> + <optional>true</optional> + </dependency> + </dependencies> + </profile> + </profiles> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/generic.mr/generic.mr.accumulo/src/main/groovy/mvm/rya/generic/mr/accumulo/AccumuloMRInfo.groovy ---------------------------------------------------------------------- diff --git a/extras/generic.mr/generic.mr.accumulo/src/main/groovy/mvm/rya/generic/mr/accumulo/AccumuloMRInfo.groovy b/extras/generic.mr/generic.mr.accumulo/src/main/groovy/mvm/rya/generic/mr/accumulo/AccumuloMRInfo.groovy new file mode 100644 index 0000000..89d4633 --- /dev/null +++ b/extras/generic.mr/generic.mr.accumulo/src/main/groovy/mvm/rya/generic/mr/accumulo/AccumuloMRInfo.groovy @@ -0,0 +1,146 @@ +package mvm.rya.generic.mr.accumulo + +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat +import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat +import org.apache.accumulo.core.data.Key +import org.apache.accumulo.core.data.Mutation +import org.apache.accumulo.core.data.Value +import org.apache.accumulo.core.security.Authorizations +import org.apache.accumulo.core.security.ColumnVisibility +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.Text +import org.apache.hadoop.mapreduce.Job +import mvm.rya.generic.mr.api.MRInfo +import org.apache.accumulo.core.client.mock.MockInstance +import org.apache.accumulo.core.client.ZooKeeperInstance + +/** + * Date: 12/3/12 + * Time: 9:00 AM + */ +class AccumuloMRInfo implements MRInfo { + + def Configuration conf + def connector; + + @Override + void initMRJob(Job job, String table, String outtable, String[] auths) { + Configuration conf = job.configuration + String username = conf.get(USERNAME) + String password = conf.get(PASSWORD) + String instance = conf.get(INSTANCE) + String zookeepers = conf.get(ZOOKEEPERS) + String mock = conf.get(MOCK) + + //input + if (Boolean.parseBoolean(mock)) { + AccumuloInputFormat.setMockInstance(conf, instance) + AccumuloOutputFormat.setMockInstance(conf, instance) + } else if (zookeepers != null) { + AccumuloInputFormat.setZooKeeperInstance(conf, instance, zookeepers) + AccumuloOutputFormat.setZooKeeperInstance(conf, instance, zookeepers) + } else { + throw new IllegalArgumentException("Must specify either mock or zookeepers"); + } + + AccumuloInputFormat.setInputInfo(conf, username, password.getBytes(), table, new Authorizations(auths)) + job.setInputFormatClass(AccumuloInputFormat.class); + + // OUTPUT + job.setOutputFormatClass(AccumuloOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Mutation.class); + AccumuloOutputFormat.setOutputInfo(job, username, password.getBytes(), true, outtable); + } + + @Override + def key(byte[] data) { + Key key = new Key(); + key.readFields(new DataInputStream(new ByteArrayInputStream(data))) + return key + } + + @Override + def key(String row, String cf, String cq, String cv, long timestamp) { + return new Key(row, cf, cq, cv, timestamp) + } + + @Override + def value(byte[] data) { + return new Value(data) + } + + @Override + def columnVisibility(String cv) { + return new ColumnVisibility(cv) + } + + @Override + def mutation(String row, String cf, String cq, String cv, long timestamp, byte[] val) { + Mutation mutation = new Mutation(row); + mutation.put(cf, cq, columnVisibility(cv), timestamp, value(val)) + return mutation + } + + @Override + def instance() { + assert conf != null + + String instance_str = conf.get(INSTANCE) + String zookeepers = conf.get(ZOOKEEPERS) + String mock = conf.get(MOCK) + if (Boolean.parseBoolean(mock)) { + return new MockInstance(instance_str) + } else if (zookeepers != null) { + return new ZooKeeperInstance(instance_str, zookeepers) + } else { + throw new IllegalArgumentException("Must specify either mock or zookeepers"); + } + } + + @Override + def connector(def instance) { + if (connector != null) return connector + + String username = conf.get(USERNAME) + String password = conf.get(PASSWORD) + if (instance == null) + instance = instance() + connector = instance.getConnector(username, password) + return connector + } + + @Override + def void writeMutations(def connector, String tableName, Iterator mutations) { + def bw = connector.createBatchWriter(tableName, 10000l, 10000l, 4); + mutations.each { m -> + bw.addMutation(m) + } + bw.flush() + bw.close() + } + + @Override + def scanner(def connector, String tableName, String[] auths) { + return connector.createScanner(tableName, new Authorizations(auths)) + } + + @Override + def batchScanner(def connector, String tableName, String[] auths, int numThreads) { + return connector.createBatchScanner(tableName, new Authorizations(auths), numThreads) + } + + @Override + def range(def startKey, def endKey) { + assert startKey != null + + if (endKey != null) + return new org.apache.accumulo.core.data.Range(startKey, endKey) + return new org.apache.accumulo.core.data.Range(startKey) + } + + @Override + def authorizations(String[] auths) { + return new Authorizations(auths) + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/generic.mr/generic.mr.accumulo/src/main/resources/META-INF/services/mvm.rya.generic.mr.api.MRInfo ---------------------------------------------------------------------- diff --git a/extras/generic.mr/generic.mr.accumulo/src/main/resources/META-INF/services/mvm.rya.generic.mr.api.MRInfo b/extras/generic.mr/generic.mr.accumulo/src/main/resources/META-INF/services/mvm.rya.generic.mr.api.MRInfo new file mode 100644 index 0000000..81d47de --- /dev/null +++ b/extras/generic.mr/generic.mr.accumulo/src/main/resources/META-INF/services/mvm.rya.generic.mr.api.MRInfo @@ -0,0 +1 @@ +mvm.rya.generic.mr.accumulo.AccumuloMRInfo \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/generic.mr/generic.mr.api/pom.xml ---------------------------------------------------------------------- diff --git a/extras/generic.mr/generic.mr.api/pom.xml b/extras/generic.mr/generic.mr.api/pom.xml new file mode 100644 index 0000000..ec1e47b --- /dev/null +++ b/extras/generic.mr/generic.mr.api/pom.xml @@ -0,0 +1,32 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>mvm.rya</groupId> + <artifactId>generic.mr</artifactId> + <version>3.2.5-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>generic.mr.api</artifactId> + <name>${project.groupId}.${project.artifactId}</name> + <dependencies> + <dependency> + <groupId>org.codehaus.groovy</groupId> + <artifactId>groovy-all</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.codehaus.gmaven</groupId> + <artifactId>gmaven-plugin</artifactId> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/generic.mr/generic.mr.api/src/main/groovy/mvm/rya/generic/mr/api/MRInfo.groovy ---------------------------------------------------------------------- diff --git a/extras/generic.mr/generic.mr.api/src/main/groovy/mvm/rya/generic/mr/api/MRInfo.groovy b/extras/generic.mr/generic.mr.api/src/main/groovy/mvm/rya/generic/mr/api/MRInfo.groovy new file mode 100644 index 0000000..bdcc61e --- /dev/null +++ b/extras/generic.mr/generic.mr.api/src/main/groovy/mvm/rya/generic/mr/api/MRInfo.groovy @@ -0,0 +1,43 @@ +package mvm.rya.generic.mr.api + +import org.apache.hadoop.conf.Configurable +import org.apache.hadoop.mapreduce.Job + +/** + * Date: 12/3/12 + * Time: 8:56 AM + */ +public interface MRInfo extends Configurable{ + + public static final String USERNAME = "username" + public static final String PASSWORD = "password" + public static final String INSTANCE = "instance" + public static final String ZOOKEEPERS = "zookeepers" + public static final String MOCK = "mock" + + def void initMRJob(Job job, String table, String outtable, String[] auths) + + def key(byte[] data); + + def key(String row, String cf, String cq, String cv, long timestamp); + + def value(byte[] data); + + def columnVisibility(String cv); + + def mutation(String row, String cf, String cq, String cv, long timestamp, byte[] val); + + def instance() + + def connector(def instance) + + def void writeMutations(def connector, String tableName, Iterator mutations) + + def scanner(def connector, String tableName, String[] auths) + + def batchScanner(def connector, String tableName, String[] auths, int numThreads) + + def range(def startKey, def endKey) + + def authorizations(String[] auths) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/generic.mr/generic.mr.api/src/main/groovy/mvm/rya/generic/mr/api/MRInfoContext.groovy ---------------------------------------------------------------------- diff --git a/extras/generic.mr/generic.mr.api/src/main/groovy/mvm/rya/generic/mr/api/MRInfoContext.groovy b/extras/generic.mr/generic.mr.api/src/main/groovy/mvm/rya/generic/mr/api/MRInfoContext.groovy new file mode 100644 index 0000000..a2b92ec --- /dev/null +++ b/extras/generic.mr/generic.mr.api/src/main/groovy/mvm/rya/generic/mr/api/MRInfoContext.groovy @@ -0,0 +1,28 @@ +package mvm.rya.generic.mr.api + +import org.apache.hadoop.conf.Configuration + +/** + * Date: 12/5/12 + * Time: 1:32 PM + */ +class MRInfoContext { + + private static currentMrInfo; + + public static MRInfo currentMRInfo() { + return currentMRInfo(null); + } + + public static MRInfo currentMRInfo(Configuration config) { + if (currentMrInfo == null) { + def iter = ServiceLoader.load(MRInfo.class, Thread.currentThread().getContextClassLoader()).iterator() + if (iter.hasNext()) { + currentMrInfo = iter.next() + if (config != null) currentMrInfo.setConf(config) + } + } + return currentMrInfo + } + +}
