hey folks, i am working on mongodb persistence manager in CQ. I implemented
all the methods but the repository shutdown due to unknown error during the
startup. Can anybody help me look at the code?
import com.mongodb.*;
import com.mongodb.gridfs.GridFS;
import com.mongodb.gridfs.GridFSDBFile;
import com.mongodb.gridfs.GridFSInputFile;
import org.apache.jackrabbit.core.id.NodeId;
import org.apache.jackrabbit.core.id.PropertyId;
import org.apache.jackrabbit.core.persistence.PMContext;
import
org.apache.jackrabbit.core.persistence.bundle.AbstractBundlePersistenceManager;
import org.apache.jackrabbit.core.persistence.util.*;
import org.apache.jackrabbit.core.state.ItemStateException;
import org.apache.jackrabbit.core.state.NoSuchItemStateException;
import org.apache.jackrabbit.core.state.NodeReferences;
import org.bson.types.Binary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jcr.RepositoryException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
/**
* Created with IntelliJ IDEA.
* Date: 6/4/12
* Time: 1:55 PM
* To change this template use File | Settings | File Templates.
*/
public class MongoBundlePersistenceManager extends
AbstractBundlePersistenceManager {
protected DB itsDb;
protected DBCollection itsNodeCollection;
/**
* flag for error handling
*/
protected ErrorHandling errorHandling = new ErrorHandling();
protected BundleBinding binding;
/**
* the minimum size of a property until it gets written to the blob
store
*/
private int minBlobSize = 0x1000;
private String itsSchema;
/** the default logger */
private static Logger log =
LoggerFactory.getLogger(org.apache.jackrabbit.core.persistence.mongo.MongoBundlePersistenceManager.class);
protected DBCollection itsRefCollection;
protected BLOBStore itsBlobStore;
@Override
protected NodePropBundle loadBundle(NodeId id) throws
ItemStateException {
BasicDBObject query = new BasicDBObject();
query.put("_id", id.toString());
BasicDBObject theDBObject = (BasicDBObject)
itsNodeCollection.findOne(query);
if (theDBObject == null) {
return null;
}
Binary theD = (Binary) theDBObject.get("d");
InputStream in = new ByteArrayInputStream(theD.getData());
try {
return binding.readBundle(in, id);
} catch (IOException e) {
String msg = "failed to read bundle: " + id + ": " + e;
log.error(msg,e);
throw new ItemStateException(msg, e);
}
}
/** initial size of buffer used to serialize objects */
protected static final int INITIAL_BUFFER_SIZE = 1024;
@Override
protected synchronized void storeBundle(NodePropBundle bundle) throws
ItemStateException {
try {
DBObject theQuery = new BasicDBObject();
theQuery.put("_id", bundle.getId().toString());
DBObject theDbObject = new BasicDBObject();
theDbObject.put("_id", bundle.getId().toString());
ByteArrayOutputStream out = new
ByteArrayOutputStream(INITIAL_BUFFER_SIZE);
binding.writeBundle(out, bundle);
theDbObject.put("d", out.toByteArray());
if( bundle.getParentId() != null) {
theDbObject.put("p", bundle.getParentId().toString());
}
itsNodeCollection.update(theQuery, theDbObject, true, false);
} catch (IOException e) {
String msg = "failed to write bundle: " + bundle.getId() + ": "
+ e;
log.error(msg, e);
throw new ItemStateException(msg, e);
}
}
@Override
protected synchronized void destroyBundle(NodePropBundle bundle) throws
ItemStateException {
//todo: delete entire subtree in single query
DBObject theDBObject = new BasicDBObject();
theDBObject.put("_id", bundle.getId().toString());
itsNodeCollection.remove(theDBObject);
}
@Override
protected synchronized void destroy(NodeReferences refs) throws
ItemStateException {
DBObject theDBObject = new BasicDBObject();
theDBObject.put("_id", refs.getTargetId().toString());
itsRefCollection.remove(theDBObject);
}
@Override
protected synchronized void store(NodeReferences refs) throws
ItemStateException {
DBObject theQuery = new BasicDBObject();
theQuery.put("_id", refs.getTargetId().toString());
DBObject theRef = new BasicDBObject();
theRef.put("_id", refs.getTargetId().toString());
ByteArrayOutputStream out = new
ByteArrayOutputStream(INITIAL_BUFFER_SIZE);
// serialize references
try {
org.apache.jackrabbit.core.persistence.util.Serializer.serialize(refs, out);
theRef.put("d", out.toByteArray());
itsRefCollection.update(theQuery, theRef, true, false);
} catch (Exception e) {
String msg = "failed to write " + refs;
log.error(msg, e);
throw new ItemStateException(msg, e);
}
}
public void init(PMContext inContext) throws Exception {
super.init(inContext);
log.info("init mongo db persistance manager: " +
getClass().getName());
Mongo m = new Mongo("localhost");
itsDb = m.getDB(getSchema());
itsNodeCollection = itsDb.getCollection("node");
itsRefCollection = itsDb.getCollection("ref");
itsBlobStore = new MongoGridFSBLOBStore();
// load namespaces
binding = new BundleBinding(errorHandling, itsBlobStore,
getNsIndex(), getNameIndex(), context.getDataStore());
binding.setMinBlobSize(minBlobSize);
log.info("complete init mongo db persistance manager: " +
getClass().getName());
}
public NodeReferences loadReferencesTo(NodeId id) throws
NoSuchItemStateException, ItemStateException {
BasicDBObject theQuery = new BasicDBObject();
theQuery.put("_id", id.toString());
BasicDBObject theRefDb = (BasicDBObject)
itsRefCollection.findOne(theQuery);
if(theRefDb == null) {
throw new NoSuchItemStateException(id.toString());
}
Binary theD = (Binary) theRefDb.get("d");
InputStream in = new ByteArrayInputStream(theD.getData());
NodeReferences theRef = new NodeReferences(id);
try {
org.apache.jackrabbit.core.persistence.util.Serializer.deserialize(theRef,
in);
return theRef;
} catch (Exception e) {
if (e instanceof NoSuchItemStateException) {
throw (NoSuchItemStateException) e;
}
String msg = "failed to read references: " + id;
log.error(msg, e);
throw new ItemStateException(msg, e);
}
}
public synchronized boolean existsReferencesTo(NodeId targetId) throws
ItemStateException {
BasicDBObject theDBObject = new BasicDBObject();
theDBObject.put("_id", targetId.toString());
BasicDBObject theRefDb = (BasicDBObject)
itsRefCollection.findOne(theDBObject);
return theRefDb != null;
}
public Iterable<NodeId> getAllNodeIds(NodeId after, int maxCount)
throws ItemStateException, RepositoryException {
DBObject theSort = new BasicDBObject();
theSort.put("_id", "1");
DBObject theQuery = new BasicDBObject();
theQuery.put("_id", after.toString());
theQuery = QueryBuilder.start().greaterThan(theQuery).get();
DBCursor theCursor =
itsRefCollection.find(theQuery).sort(theSort).limit(maxCount);
ArrayList<NodeId> result = new ArrayList<NodeId>();
for (DBObject theDBObject : theCursor) {
result.add(NodeId.valueOf(theDBObject.get("_id").toString()));
}
return result;
}
@Override
protected BLOBStore getBlobStore() {
return itsBlobStore; //To change body of implemented methods use
File | Settings | File Templates.
}
public String getSchema() {
return itsSchema;
}
public void setSchema(String inSchema) {
itsSchema = inSchema;
}
private class MongoGridFSBLOBStore implements BLOBStore {
private GridFS itsGripFS;
public MongoGridFSBLOBStore() {
itsGripFS = new GridFS( itsDb );
}
public String createId(PropertyId id, int index) {
StringBuffer buf = new StringBuffer();
buf.append(id.getParentId().toString());
buf.append('.');
buf.append(getNsIndex().stringToIndex(id.getName().getNamespaceURI()));
buf.append('.');
buf.append(getNameIndex().stringToIndex(id.getName().getLocalName()));
buf.append('.');
buf.append(index);
return buf.toString();
}
public void put(String blobId, InputStream in, long size) throws
Exception {
GridFSInputFile theGridFSInputFile = itsGripFS.createFile( in,
true );
theGridFSInputFile.put("_id", blobId);
theGridFSInputFile.save();
}
public InputStream get(String blobId) throws Exception {
GridFSDBFile out = itsGripFS.findOne( new BasicDBObject( "_id"
, blobId ) );
return out.getInputStream();
}
public boolean remove(String blobId) throws Exception {
itsGripFS.remove( new BasicDBObject( "_id" , blobId ) );
return true;
}
}
}
- Steven
*****
This e-mail message is intended only for the designated recipient(s) named
above. The information contained in this e-mail and any attachments may be
confidential or legally privileged. If you are not the intended recipient, you
may not review, retain, copy, redistribute or use this e-mail or any attachment
for any purpose, or disclose all or any part of its contents. If you have
received this e-mail in error, please immediately notify the sender by reply
e-mail and permanently delete this e-mail and any attachments from your
computer system.
*****