http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/instance/MongoDetailsAdapter.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/instance/MongoDetailsAdapter.java b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/instance/MongoDetailsAdapter.java deleted file mode 100644 index 282ecbb..0000000 --- a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/instance/MongoDetailsAdapter.java +++ /dev/null @@ -1,236 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.mongodb.instance; - -import static java.util.Objects.requireNonNull; - -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map.Entry; - -import javax.annotation.ParametersAreNonnullByDefault; - -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.mongodb.BasicDBList; -import com.mongodb.BasicDBObject; -import com.mongodb.BasicDBObjectBuilder; -import com.mongodb.DBObject; - -import mvm.rya.api.instance.RyaDetails; -import mvm.rya.api.instance.RyaDetails.EntityCentricIndexDetails; -import mvm.rya.api.instance.RyaDetails.FreeTextIndexDetails; -import mvm.rya.api.instance.RyaDetails.GeoIndexDetails; -import mvm.rya.api.instance.RyaDetails.JoinSelectivityDetails; -import mvm.rya.api.instance.RyaDetails.PCJIndexDetails; -import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails; -import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; -import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy; -import mvm.rya.api.instance.RyaDetails.ProspectorDetails; -import mvm.rya.api.instance.RyaDetails.TemporalIndexDetails; - -/** - * Serializes configuration details for use in Mongo. - * The {@link DBObject} will look like: - * <pre> - * {@code - * { - * "instanceName": <string>, - * "version": <string>?, - * "entityCentricDetails": <boolean>, - * "geoDetails": <boolean>, - * "pcjDetails": { - * "enabled": <boolean>, - * "fluoName": <string>, - * "pcjs": [{ - * "id": <string>, - * "updateStrategy": <string>, - * "lastUpdate": <date> - * },...,{} - * ] - * }, - * "temporalDetails": <boolean>, - * "freeTextDetails": <boolean>, - * "prospectorDetails": <date>, - * "joinSelectivityDetails": <date> - * } - * </pre> - */ -@ParametersAreNonnullByDefault -public class MongoDetailsAdapter { - public static final String INSTANCE_KEY = "instanceName"; - public static final String VERSION_KEY = "version"; - - public static final String ENTITY_DETAILS_KEY = "entityCentricDetails"; - public static final String GEO_DETAILS_KEY = "geoDetails"; - public static final String PCJ_DETAILS_KEY = "pcjDetails"; - public static final String PCJ_ENABLED_KEY = "enabled"; - public static final String PCJ_FLUO_KEY = "fluoName"; - public static final String PCJ_PCJS_KEY = "pcjs"; - public static final String PCJ_ID_KEY = "id"; - public static final String PCJ_UPDATE_STRAT_KEY = "updateStrategy"; - public static final String PCJ_LAST_UPDATE_KEY = "lastUpdate"; - public static final String TEMPORAL_DETAILS_KEY = "temporalDetails"; - public static final String FREETEXT_DETAILS_KEY = "freeTextDetails"; - - public static final String PROSPECTOR_DETAILS_KEY = "prospectorDetails"; - public static final String JOIN_SELECTIVITY_DETAILS_KEY = "joinSelectivitiyDetails"; - - /** - * Serializes {@link RyaDetails} to mongo {@link DBObject}. - * @param details - The details to be serialized. - * @return The mongo {@link DBObject}. - */ - public static BasicDBObject toDBObject(final RyaDetails details) { - Preconditions.checkNotNull(details); - final BasicDBObjectBuilder builder = BasicDBObjectBuilder.start() - .add(INSTANCE_KEY, details.getRyaInstanceName()) - .add(VERSION_KEY, details.getRyaVersion()) - .add(ENTITY_DETAILS_KEY, details.getEntityCentricIndexDetails().isEnabled()) - .add(GEO_DETAILS_KEY, details.getGeoIndexDetails().isEnabled()) - .add(PCJ_DETAILS_KEY, toDBObject(details.getPCJIndexDetails())) - .add(TEMPORAL_DETAILS_KEY, details.getTemporalIndexDetails().isEnabled()) - .add(FREETEXT_DETAILS_KEY, details.getFreeTextIndexDetails().isEnabled()); - if(details.getProspectorDetails().getLastUpdated().isPresent()) { - builder.add(PROSPECTOR_DETAILS_KEY, details.getProspectorDetails().getLastUpdated().get()); - } - if(details.getJoinSelectivityDetails().getLastUpdated().isPresent()) { - builder.add(JOIN_SELECTIVITY_DETAILS_KEY, details.getJoinSelectivityDetails().getLastUpdated().get()); - } - return (BasicDBObject) builder.get(); - } - - private static DBObject toDBObject(final PCJIndexDetails pcjIndexDetails) { - requireNonNull(pcjIndexDetails); - - final BasicDBObjectBuilder builder = BasicDBObjectBuilder.start(); - - // Is Enabled - builder.add(PCJ_ENABLED_KEY, pcjIndexDetails.isEnabled()); - - // Fluo Details if present. - if(pcjIndexDetails.getFluoDetails().isPresent()) { - builder.add(PCJ_FLUO_KEY, pcjIndexDetails.getFluoDetails().get().getUpdateAppName()); - } - - // Add the PCJDetail objects. - final List<DBObject> pcjDetailsList = new ArrayList<>(); - for(final PCJDetails pcjDetails : pcjIndexDetails.getPCJDetails().values()) { - pcjDetailsList.add( toDBObject( pcjDetails ) ); - } - builder.add(PCJ_PCJS_KEY, pcjDetailsList.toArray()); - - return builder.get(); - } - - static DBObject toDBObject(final PCJDetails pcjDetails) { - requireNonNull(pcjDetails); - - final BasicDBObjectBuilder builder = BasicDBObjectBuilder.start(); - - // PCJ ID - builder.add(PCJ_ID_KEY, pcjDetails.getId()); - - // PCJ Update Strategy if present. - if(pcjDetails.getUpdateStrategy().isPresent()) { - builder.add(PCJ_UPDATE_STRAT_KEY, pcjDetails.getUpdateStrategy().get().name()); - } - - // Last Update Time if present. - if(pcjDetails.getLastUpdateTime().isPresent()) { - builder.add(PCJ_LAST_UPDATE_KEY, pcjDetails.getLastUpdateTime().get()); - } - - return builder.get(); - } - - public static RyaDetails toRyaDetails(final DBObject mongoObj) throws MalformedRyaDetailsException { - final BasicDBObject basicObj = (BasicDBObject) mongoObj; - try { - return RyaDetails.builder() - .setRyaInstanceName(basicObj.getString(INSTANCE_KEY)) - .setRyaVersion(basicObj.getString(VERSION_KEY)) - .setEntityCentricIndexDetails(new EntityCentricIndexDetails(basicObj.getBoolean(ENTITY_DETAILS_KEY))) - .setGeoIndexDetails(new GeoIndexDetails(basicObj.getBoolean(GEO_DETAILS_KEY))) - .setPCJIndexDetails(getPCJIndexDetails(basicObj)) - .setTemporalIndexDetails(new TemporalIndexDetails(basicObj.getBoolean(TEMPORAL_DETAILS_KEY))) - .setFreeTextDetails(new FreeTextIndexDetails(basicObj.getBoolean(FREETEXT_DETAILS_KEY))) - .setProspectorDetails(new ProspectorDetails(Optional.<Date>fromNullable(basicObj.getDate(PROSPECTOR_DETAILS_KEY)))) - .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>fromNullable(basicObj.getDate(JOIN_SELECTIVITY_DETAILS_KEY)))) - .build(); - } catch(final Exception e) { - throw new MalformedRyaDetailsException("Failed to make RyaDetail from Mongo Object, it is malformed.", e); - } - } - - private static PCJIndexDetails.Builder getPCJIndexDetails(final BasicDBObject basicObj) { - final BasicDBObject pcjIndexDBO = (BasicDBObject) basicObj.get(PCJ_DETAILS_KEY); - - final PCJIndexDetails.Builder pcjBuilder = PCJIndexDetails.builder() - .setEnabled(pcjIndexDBO.getBoolean(PCJ_ENABLED_KEY)) - .setFluoDetails(new FluoDetails(pcjIndexDBO.getString(PCJ_FLUO_KEY))); - - final BasicDBList pcjs = (BasicDBList) pcjIndexDBO.get(PCJ_PCJS_KEY); - if(pcjs != null) { - for(int ii = 0; ii < pcjs.size(); ii++) { - final BasicDBObject pcj = (BasicDBObject) pcjs.get(ii); - pcjBuilder.addPCJDetails( toPCJDetails(pcj) ); - } - } - return pcjBuilder; - } - - static PCJDetails.Builder toPCJDetails(final BasicDBObject dbo) { - requireNonNull(dbo); - - // PCJ ID. - final PCJDetails.Builder builder = PCJDetails.builder() - .setId( dbo.getString(PCJ_ID_KEY) ); - - // PCJ Update Strategy if present. - if(dbo.containsField(PCJ_UPDATE_STRAT_KEY)) { - builder.setUpdateStrategy( PCJUpdateStrategy.valueOf( dbo.getString(PCJ_UPDATE_STRAT_KEY) ) ); - } - - // Last Update Time if present. - if(dbo.containsField(PCJ_LAST_UPDATE_KEY)) { - builder.setLastUpdateTime( dbo.getDate(PCJ_LAST_UPDATE_KEY) ); - } - - return builder; - } - - /** - * Exception thrown when a MongoDB {@link DBObject} is malformed when attemptin - * to adapt it into a {@link RyaDetails}. - */ - public static class MalformedRyaDetailsException extends Exception { - private static final long serialVersionUID = 1L; - - /** - * Creates a new {@link MalformedRyaDetailsException} - * @param message - The message to be displayed by the exception. - * @param e - The source cause of the exception. - */ - public MalformedRyaDetailsException(final String message, final Throwable e) { - super(message, e); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/instance/MongoRyaInstanceDetailsRepository.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/instance/MongoRyaInstanceDetailsRepository.java b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/instance/MongoRyaInstanceDetailsRepository.java deleted file mode 100644 index dfefa8f..0000000 --- a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/instance/MongoRyaInstanceDetailsRepository.java +++ /dev/null @@ -1,144 +0,0 @@ -package mvm.rya.mongodb.instance; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import static com.google.common.base.Preconditions.checkNotNull; -import static java.util.Objects.requireNonNull; - -import javax.annotation.ParametersAreNonnullByDefault; - -import com.mongodb.BasicDBObject; -import com.mongodb.DB; -import com.mongodb.DBCollection; -import com.mongodb.DBObject; -import com.mongodb.MongoClient; -import com.mongodb.WriteResult; - -import mvm.rya.api.instance.RyaDetails; -import mvm.rya.api.instance.RyaDetailsRepository; -import mvm.rya.mongodb.instance.MongoDetailsAdapter.MalformedRyaDetailsException; - -/** - * An implementation of {@link RyaDetailsRepository} that stores a Rya - * instance's {@link RyaDetails} in a Mongo document. - */ -@ParametersAreNonnullByDefault -public class MongoRyaInstanceDetailsRepository implements RyaDetailsRepository { - private static final String INSTANCE_DETAILS_COLLECTION_NAME = "instance_details"; - - private final DB db; - private final String instanceName; - - /** - * Constructs an instance of {@link MongoRyaInstanceDetailsRepository}. - * - * @param client - Connects to the instance of Mongo that hosts the Rya instance. (not null) - * @param instanceName - The name of the Rya instance this repository represents. (not null) - */ - public MongoRyaInstanceDetailsRepository(final MongoClient client, final String instanceName) { - checkNotNull(client); - this.instanceName = requireNonNull( instanceName ); - db = client.getDB(this.instanceName); - } - - @Override - public boolean isInitialized() throws RyaDetailsRepositoryException { - final DBCollection col = db.getCollection(INSTANCE_DETAILS_COLLECTION_NAME); - return col.count() == 1; - } - - @Override - public void initialize(final RyaDetails details) throws AlreadyInitializedException, RyaDetailsRepositoryException { - // Preconditions. - requireNonNull( details ); - - if(!details.getRyaInstanceName().equals( instanceName )) { - throw new RyaDetailsRepositoryException("The instance name that was in the provided 'details' does not match " + - "the instance name that this repository is connected to. Make sure you're connected to the" + - "correct Rya instance."); - } - - if(isInitialized()) { - throw new AlreadyInitializedException("The repository has already been initialized for the Rya instance named '" + - instanceName + "'."); - } - - // Create the document that hosts the details if it has not been created yet. - final DBCollection col = db.createCollection(INSTANCE_DETAILS_COLLECTION_NAME, new BasicDBObject()); - - // Write the details to the collection. - col.insert(MongoDetailsAdapter.toDBObject(details)); - } - - @Override - public RyaDetails getRyaInstanceDetails() throws NotInitializedException, RyaDetailsRepositoryException { - // Preconditions. - if(!isInitialized()) { - throw new NotInitializedException("Could not fetch the details for the Rya instanced named '" + - instanceName + "' because it has not been initialized yet."); - } - - // Fetch the value from the collection. - final DBCollection col = db.getCollection(INSTANCE_DETAILS_COLLECTION_NAME); - //There should only be one document in the collection. - final DBObject mongoObj = col.findOne(); - - try{ - // Deserialize it. - return MongoDetailsAdapter.toRyaDetails( mongoObj ); - } catch (final MalformedRyaDetailsException e) { - throw new RyaDetailsRepositoryException("The existing details details are malformed.", e); - } - } - - @Override - public void update(final RyaDetails oldDetails, final RyaDetails newDetails) - throws NotInitializedException, ConcurrentUpdateException, RyaDetailsRepositoryException { - // Preconditions. - requireNonNull(oldDetails); - requireNonNull(newDetails); - - if(!newDetails.getRyaInstanceName().equals( instanceName )) { - throw new RyaDetailsRepositoryException("The instance name that was in the provided 'newDetails' does not match " + - "the instance name that this repository is connected to. Make sure you're connected to the" + - "correct Rya instance."); - } - - if(!isInitialized()) { - throw new NotInitializedException("Could not update the details for the Rya instanced named '" + - instanceName + "' because it has not been initialized yet."); - } - - if(oldDetails.equals(newDetails)) { - return; - } - - final DBCollection col = db.getCollection(INSTANCE_DETAILS_COLLECTION_NAME); - final DBObject oldObj = MongoDetailsAdapter.toDBObject(oldDetails); - final DBObject newObj = MongoDetailsAdapter.toDBObject(newDetails); - final WriteResult result = col.update(oldObj, newObj); - - //since there is only 1 document, there should only be 1 update. - if(result.getN() != 1) { - throw new ConcurrentUpdateException("Could not update the details for the Rya instance named '" + - instanceName + "' because the old value is out of date."); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/NonCloseableRyaStatementCursorIterator.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/NonCloseableRyaStatementCursorIterator.java b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/NonCloseableRyaStatementCursorIterator.java deleted file mode 100644 index ba37ca1..0000000 --- a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/NonCloseableRyaStatementCursorIterator.java +++ /dev/null @@ -1,57 +0,0 @@ -package mvm.rya.mongodb.iter; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import java.util.Iterator; - -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.persist.RyaDAOException; - -public class NonCloseableRyaStatementCursorIterator implements Iterator<RyaStatement> { - - RyaStatementCursorIterator iterator; - - @Override - public boolean hasNext() { - return iterator.hasNext(); - } - - @Override - public RyaStatement next() { - return iterator.next(); - } - - public NonCloseableRyaStatementCursorIterator( - RyaStatementCursorIterator iterator) { - this.iterator = iterator; - } - - @Override - public void remove() { - try { - iterator.remove(); - } catch (RyaDAOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java deleted file mode 100644 index d24cbdc..0000000 --- a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java +++ /dev/null @@ -1,125 +0,0 @@ -package mvm.rya.mongodb.iter; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import info.aduna.iteration.CloseableIteration; - -import java.util.Collection; -import java.util.Iterator; -import java.util.Map.Entry; - -import mvm.rya.api.RdfCloudTripleStoreUtils; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.persist.RyaDAOException; -import mvm.rya.mongodb.dao.MongoDBStorageStrategy; - -import org.openrdf.query.BindingSet; - -import com.google.common.collect.Multimap; -import com.mongodb.DBCollection; -import com.mongodb.DBCursor; -import com.mongodb.DBObject; - -public class RyaStatementBindingSetCursorIterator implements CloseableIteration<Entry<RyaStatement, BindingSet>, RyaDAOException> { - - private DBCollection coll; - private Multimap<DBObject, BindingSet> rangeMap; - private Iterator<DBObject> queryIterator; - private Long maxResults; - private DBCursor resultCursor; - private RyaStatement currentStatement; - private Collection<BindingSet> currentBindingSetCollection; - private Iterator<BindingSet> currentBindingSetIterator; - private MongoDBStorageStrategy strategy; - - public RyaStatementBindingSetCursorIterator(DBCollection coll, - Multimap<DBObject, BindingSet> rangeMap, MongoDBStorageStrategy strategy) { - this.coll = coll; - this.rangeMap = rangeMap; - this.queryIterator = rangeMap.keySet().iterator(); - this.strategy = strategy; - } - - @Override - public boolean hasNext() { - if (!currentBindingSetIteratorIsValid()) { - findNextResult(); - } - return currentBindingSetIteratorIsValid(); - } - - @Override - public Entry<RyaStatement, BindingSet> next() { - if (!currentBindingSetIteratorIsValid()) { - findNextResult(); - } - if (currentBindingSetIteratorIsValid()) { - BindingSet currentBindingSet = currentBindingSetIterator.next(); - return new RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(currentStatement, currentBindingSet); - } - return null; - } - - private boolean currentBindingSetIteratorIsValid() { - return (currentBindingSetIterator != null) && currentBindingSetIterator.hasNext(); - } - - private void findNextResult() { - if (!currentResultCursorIsValid()) { - findNextValidResultCursor(); - } - if (currentResultCursorIsValid()) { - // convert to Rya Statement - DBObject queryResult = resultCursor.next(); - currentStatement = strategy.deserializeDBObject(queryResult); - currentBindingSetIterator = currentBindingSetCollection.iterator(); - } - } - - private void findNextValidResultCursor() { - while (queryIterator.hasNext()){ - DBObject currentQuery = queryIterator.next(); - resultCursor = coll.find(currentQuery); - currentBindingSetCollection = rangeMap.get(currentQuery); - if (resultCursor.hasNext()) return; - } - } - - private boolean currentResultCursorIsValid() { - return (resultCursor != null) && resultCursor.hasNext(); - } - - - public void setMaxResults(Long maxResults) { - this.maxResults = maxResults; - } - - @Override - public void close() throws RyaDAOException { - // TODO don't know what to do here - } - - @Override - public void remove() throws RyaDAOException { - next(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterable.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterable.java b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterable.java deleted file mode 100644 index 83bd2d4..0000000 --- a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterable.java +++ /dev/null @@ -1,67 +0,0 @@ -package mvm.rya.mongodb.iter; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import info.aduna.iteration.CloseableIteration; - -import java.io.IOException; -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.Set; - -import mvm.rya.api.RdfCloudTripleStoreUtils; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.persist.RyaDAOException; - -import org.calrissian.mango.collect.CloseableIterable; -import org.calrissian.mango.collect.CloseableIterator; -import org.openrdf.query.BindingSet; - -import com.mongodb.DBCollection; -import com.mongodb.DBCursor; -import com.mongodb.DBObject; - -public class RyaStatementCursorIterable implements CloseableIterable<RyaStatement> { - - - private NonCloseableRyaStatementCursorIterator iterator; - - public RyaStatementCursorIterable(NonCloseableRyaStatementCursorIterator iterator) { - this.iterator = iterator; - } - - @Override - public Iterator<RyaStatement> iterator() { - // TODO Auto-generated method stub - return iterator; - } - - @Override - public void closeQuietly() { - //TODO don't know what to do here - } - - @Override - public void close() throws IOException { - // TODO Auto-generated method stub - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterator.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterator.java b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterator.java deleted file mode 100644 index 8df2c60..0000000 --- a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterator.java +++ /dev/null @@ -1,104 +0,0 @@ -package mvm.rya.mongodb.iter; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import info.aduna.iteration.CloseableIteration; - -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.Set; - -import mvm.rya.api.RdfCloudTripleStoreUtils; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.persist.RyaDAOException; -import mvm.rya.mongodb.dao.MongoDBStorageStrategy; - -import org.calrissian.mango.collect.CloseableIterable; -import org.openrdf.query.BindingSet; - -import com.mongodb.DBCollection; -import com.mongodb.DBCursor; -import com.mongodb.DBObject; - -public class RyaStatementCursorIterator implements CloseableIteration<RyaStatement, RyaDAOException> { - - private DBCollection coll; - private Iterator<DBObject> queryIterator; - private DBCursor currentCursor; - private MongoDBStorageStrategy strategy; - private Long maxResults; - - public RyaStatementCursorIterator(DBCollection coll, Set<DBObject> queries, MongoDBStorageStrategy strategy) { - this.coll = coll; - this.queryIterator = queries.iterator(); - this.strategy = strategy; - } - - @Override - public boolean hasNext() { - if (!currentCursorIsValid()) { - findNextValidCursor(); - } - return currentCursorIsValid(); - } - - @Override - public RyaStatement next() { - if (!currentCursorIsValid()) { - findNextValidCursor(); - } - if (currentCursorIsValid()) { - // convert to Rya Statement - DBObject queryResult = currentCursor.next(); - RyaStatement statement = strategy.deserializeDBObject(queryResult); - return statement; - } - return null; - } - - private void findNextValidCursor() { - while (queryIterator.hasNext()){ - DBObject currentQuery = queryIterator.next(); - currentCursor = coll.find(currentQuery); - if (currentCursor.hasNext()) break; - } - } - - private boolean currentCursorIsValid() { - return (currentCursor != null) && currentCursor.hasNext(); - } - - - public void setMaxResults(Long maxResults) { - this.maxResults = maxResults; - } - - @Override - public void close() throws RyaDAOException { - // TODO don't know what to do here - } - - @Override - public void remove() throws RyaDAOException { - next(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoConnectorFactory.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoConnectorFactory.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoConnectorFactory.java new file mode 100644 index 0000000..77a9f16 --- /dev/null +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoConnectorFactory.java @@ -0,0 +1,138 @@ +package mvm.rya.mongodb; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.net.UnknownHostException; +import java.util.Arrays; +import java.io.IOException; + +import org.apache.commons.configuration.ConfigurationRuntimeException; +import org.apache.hadoop.conf.Configuration; + +import com.mongodb.MongoClient; +import com.mongodb.MongoCredential; +import com.mongodb.MongoException; +import com.mongodb.ServerAddress; + +import de.flapdoodle.embed.mongo.distribution.Version; +import de.flapdoodle.embed.mongo.tests.MongodForTestsFactory; + +/** + * Mongo convention generally allows for a single instance of a {@link MongoClient} + * throughout the life cycle of an application. This MongoConnectorFactory lazy + * loads a Mongo Client and uses the same one whenever {@link MongoConnectorFactory#getMongoClient(Configuration)} + * is invoked. + */ +public class MongoConnectorFactory { + private static MongoClient mongoClient; + + private final static String MSG_INTRO = "Failed to connect to MongoDB: "; + + /** + * @param conf The {@link Configuration} defining how to construct the MongoClient. + * @return A {@link MongoClient}. This client is lazy loaded and the same one + * is used throughout the lifecycle of the application. + * @throws IOException - if MongodForTestsFactory constructor has an io exception. + * @throws ConfigurationRuntimeException - Thrown if the configured server, port, user, or others are missing. + * @throws MongoException if can't connect despite conf parameters are given + */ + public static synchronized MongoClient getMongoClient(final Configuration conf) + throws ConfigurationRuntimeException, MongoException { + if (mongoClient == null) { + // The static client has not yet created, is it a test/mock instance, or a service? + if (conf.getBoolean(MongoDBRdfConfiguration.USE_TEST_MONGO, false)) { + createMongoClientForTests(); + } else { + createMongoClientForServer(conf); + } + } + return mongoClient; + } + + /** + * Create a local temporary MongoDB instance and client object and assign it to this class's static mongoClient + * @throws MongoException if can't connect + */ + private static void createMongoClientForTests() throws MongoException { + try { + MongodForTestsFactory testsFactory = MongodForTestsFactory.with(Version.Main.PRODUCTION); + mongoClient = testsFactory.newMongo(); + } catch (IOException e) { + // Rethrow as an unchecked error. Since we are in a test mode here, just fail fast. + throw new MongoException(MSG_INTRO+"creating a factory for a test/mock MongoDB instance.",e); + } + } + + /** + * Create a MongoDB client object and assign it to this class's static mongoClient + * @param conf configuration containing connection parameters + * @throws ConfigurationRuntimeException - Thrown if the configured server, port, user, or others are missing. + * @throws MongoException if can't connect despite conf parameters are given + */ + private static void createMongoClientForServer(final Configuration conf) + throws ConfigurationRuntimeException, MongoException { + // Connect to a running Mongo server + final String host = requireNonNull(conf.get(MongoDBRdfConfiguration.MONGO_INSTANCE), MSG_INTRO+"host name is required"); + final int port = requireNonNullInt(conf.get(MongoDBRdfConfiguration.MONGO_INSTANCE_PORT), MSG_INTRO+"Port number is required."); + ServerAddress server = new ServerAddress(host, port); + // check for authentication credentials + if (conf.get(MongoDBRdfConfiguration.MONGO_USER) != null) { + final String username = conf.get(MongoDBRdfConfiguration.MONGO_USER); + final String dbName = requireNonNull(conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME), + MSG_INTRO + MongoDBRdfConfiguration.MONGO_DB_NAME + " is null but required configuration if " + + MongoDBRdfConfiguration.MONGO_USER + " is configured."); + final char[] pswd = requireNonNull(conf.get(MongoDBRdfConfiguration.MONGO_USER_PASSWORD), + MSG_INTRO + MongoDBRdfConfiguration.MONGO_USER_PASSWORD + " is null but required configuration if " + + MongoDBRdfConfiguration.MONGO_USER + " is configured.").toCharArray(); + final MongoCredential cred = MongoCredential.createCredential(username, dbName, pswd); + mongoClient = new MongoClient(server, Arrays.asList(cred)); + } else { + // No user was configured: + mongoClient = new MongoClient(server); + } + } + + /** + * Throw exception for un-configured required values. + * + * @param required String to check + * @param message throw configuration exception with this description + * @return unaltered required string + * @throws ConfigurationRuntimeException if required is null + */ + private static String requireNonNull(String required, String message) throws ConfigurationRuntimeException { + if (required == null) + throw new ConfigurationRuntimeException(message); + return required; + } + + /* + * Same as above, check that it is a integer and return the parsed integer. + */ + private static int requireNonNullInt(String required, String message) throws ConfigurationRuntimeException { + if (required == null) + throw new ConfigurationRuntimeException(message); + try { + return Integer.parseInt(required); + } catch (NumberFormatException e) { + throw new ConfigurationRuntimeException(message); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBQueryEngine.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBQueryEngine.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBQueryEngine.java new file mode 100644 index 0000000..afa0a77 --- /dev/null +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBQueryEngine.java @@ -0,0 +1,202 @@ +package mvm.rya.mongodb; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.calrissian.mango.collect.CloseableIterable; +import org.openrdf.query.BindingSet; + +import com.mongodb.DB; +import com.mongodb.DBCollection; +import com.mongodb.DBObject; +import com.mongodb.MongoClient; + +import info.aduna.iteration.CloseableIteration; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.api.persist.query.BatchRyaQuery; +import mvm.rya.api.persist.query.RyaQuery; +import mvm.rya.api.persist.query.RyaQueryEngine; +import mvm.rya.mongodb.dao.MongoDBStorageStrategy; +import mvm.rya.mongodb.dao.SimpleMongoDBStorageStrategy; +import mvm.rya.mongodb.iter.NonCloseableRyaStatementCursorIterator; +import mvm.rya.mongodb.iter.RyaStatementBindingSetCursorIterator; +import mvm.rya.mongodb.iter.RyaStatementCursorIterable; +import mvm.rya.mongodb.iter.RyaStatementCursorIterator; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; + +/** + * Date: 7/17/12 + * Time: 9:28 AM + */ +public class MongoDBQueryEngine implements RyaQueryEngine<MongoDBRdfConfiguration>, Closeable { + + private MongoDBRdfConfiguration configuration; + private final MongoClient mongoClient; + private final DBCollection coll; + private final MongoDBStorageStrategy strategy; + + public MongoDBQueryEngine(final MongoDBRdfConfiguration conf, final MongoClient mongoClient) { + this.mongoClient = checkNotNull(mongoClient); + final DB db = mongoClient.getDB( conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME)); + coll = db.getCollection(conf.getTriplesCollectionName()); + strategy = new SimpleMongoDBStorageStrategy(); + } + + + @Override + public void setConf(final MongoDBRdfConfiguration conf) { + configuration = conf; + } + + @Override + public MongoDBRdfConfiguration getConf() { + return configuration; + } + + @Override + public CloseableIteration<RyaStatement, RyaDAOException> query( + final RyaStatement stmt, MongoDBRdfConfiguration conf) + throws RyaDAOException { + if (conf == null) { + conf = configuration; + } + final Long maxResults = conf.getLimit(); + final Set<DBObject> queries = new HashSet<DBObject>(); + final DBObject query = strategy.getQuery(stmt); + queries.add(query); + final RyaStatementCursorIterator iterator = new RyaStatementCursorIterator(coll, queries, strategy); + + if (maxResults != null) { + iterator.setMaxResults(maxResults); + } + return iterator; + } + @Override + public CloseableIteration<? extends Entry<RyaStatement, BindingSet>, RyaDAOException> queryWithBindingSet( + final Collection<Entry<RyaStatement, BindingSet>> stmts, + MongoDBRdfConfiguration conf) throws RyaDAOException { + if (conf == null) { + conf = configuration; + } + final Long maxResults = conf.getLimit(); + final Multimap<DBObject, BindingSet> rangeMap = HashMultimap.create(); + + //TODO: cannot span multiple tables here + try { + for (final Map.Entry<RyaStatement, BindingSet> stmtbs : stmts) { + final RyaStatement stmt = stmtbs.getKey(); + final BindingSet bs = stmtbs.getValue(); + final DBObject query = strategy.getQuery(stmt); + rangeMap.put(query, bs); + } + + // TODO not sure what to do about regex ranges? + final RyaStatementBindingSetCursorIterator iterator = new RyaStatementBindingSetCursorIterator(coll, rangeMap, strategy); + + if (maxResults != null) { + iterator.setMaxResults(maxResults); + } + return iterator; + } catch (final Exception e) { + throw new RyaDAOException(e); + } + + } + @Override + public CloseableIteration<RyaStatement, RyaDAOException> batchQuery( + final Collection<RyaStatement> stmts, MongoDBRdfConfiguration conf) + throws RyaDAOException { + if (conf == null) { + conf = configuration; + } + final Long maxResults = conf.getLimit(); + final Set<DBObject> queries = new HashSet<DBObject>(); + + try { + for (final RyaStatement stmt : stmts) { + queries.add( strategy.getQuery(stmt)); + } + + // TODO not sure what to do about regex ranges? + final RyaStatementCursorIterator iterator = new RyaStatementCursorIterator(coll, queries, strategy); + + if (maxResults != null) { + iterator.setMaxResults(maxResults); + } + return iterator; + } catch (final Exception e) { + throw new RyaDAOException(e); + } + + } + @Override + public CloseableIterable<RyaStatement> query(final RyaQuery ryaQuery) + throws RyaDAOException { + final Set<DBObject> queries = new HashSet<DBObject>(); + + try { + queries.add( strategy.getQuery(ryaQuery)); + + // TODO not sure what to do about regex ranges? + // TODO this is gross + final RyaStatementCursorIterable iterator = new RyaStatementCursorIterable(new NonCloseableRyaStatementCursorIterator(new RyaStatementCursorIterator(coll, queries, strategy))); + + return iterator; + } catch (final Exception e) { + throw new RyaDAOException(e); + } + } + @Override + public CloseableIterable<RyaStatement> query(final BatchRyaQuery batchRyaQuery) + throws RyaDAOException { + try { + final Set<DBObject> queries = new HashSet<DBObject>(); + for (final RyaStatement statement : batchRyaQuery.getQueries()){ + queries.add( strategy.getQuery(statement)); + + } + + // TODO not sure what to do about regex ranges? + // TODO this is gross + final RyaStatementCursorIterable iterator = new RyaStatementCursorIterable(new NonCloseableRyaStatementCursorIterator(new RyaStatementCursorIterator(coll, queries, strategy))); + + return iterator; + } catch (final Exception e) { + throw new RyaDAOException(e); + } + } + + @Override + public void close() throws IOException { + if (mongoClient != null){ mongoClient.close(); } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java new file mode 100644 index 0000000..e8e301d --- /dev/null +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java @@ -0,0 +1,128 @@ +package mvm.rya.mongodb; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + + +import java.util.List; + +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.persist.index.RyaSecondaryIndexer; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Lists; +import com.mongodb.MongoClient; + +public class MongoDBRdfConfiguration extends RdfCloudTripleStoreConfiguration { + public static final String MONGO_INSTANCE = "mongo.db.instance"; + public static final String MONGO_INSTANCE_PORT = "mongo.db.port"; + public static final String MONGO_GEO_MAXDISTANCE = "mongo.geo.maxdist"; + public static final String MONGO_DB_NAME = "mongo.db.name"; + public static final String MONGO_COLLECTION_PREFIX = "mongo.db.collectionprefix"; + public static final String MONGO_USER = "mongo.db.user"; + public static final String MONGO_USER_PASSWORD = "mongo.db.userpassword"; + public static final String USE_TEST_MONGO = "mongo.db.test"; + public static final String CONF_ADDITIONAL_INDEXERS = "ac.additional.indexers"; + private MongoClient mongoClient; + + public MongoDBRdfConfiguration() { + super(); + } + + public MongoDBRdfConfiguration(Configuration other) { + super(other); + } + + @Override + public MongoDBRdfConfiguration clone() { + return new MongoDBRdfConfiguration(this); + } + + public boolean getUseTestMongo() { + return this.getBoolean(USE_TEST_MONGO, false); + } + + public void setUseTestMongo(boolean useTestMongo) { + this.setBoolean(USE_TEST_MONGO, useTestMongo); + } + + public String getTriplesCollectionName() { + return this.get(MONGO_COLLECTION_PREFIX, "rya") + "_triples"; + } + + public String getCollectionName() { + return this.get(MONGO_COLLECTION_PREFIX, "rya"); + } + + public void setCollectionName(String name) { + this.set(MONGO_COLLECTION_PREFIX, name); + } + + public String getMongoInstance() { + return this.get(MONGO_INSTANCE, "localhost"); + } + + public void setMongoInstance(String name) { + this.set(MONGO_INSTANCE, name); + } + + public String getMongoPort() { + return this.get(MONGO_INSTANCE_PORT, "27017"); + } + + public void setMongoPort(String name) { + this.set(MONGO_INSTANCE_PORT, name); + } + + public String getMongoDBName() { + return this.get(MONGO_DB_NAME, "rya"); + } + + public void setMongoDBName(String name) { + this.set(MONGO_DB_NAME, name); + } + + public String getNameSpacesCollectionName() { + return this.get(MONGO_COLLECTION_PREFIX, "rya") + "_ns"; + } + + public void setAdditionalIndexers(Class<? extends MongoSecondaryIndex>... indexers) { + List<String> strs = Lists.newArrayList(); + for (Class<?> ai : indexers){ + strs.add(ai.getName()); + } + + setStrings(CONF_ADDITIONAL_INDEXERS, strs.toArray(new String[]{})); + } + + public List<MongoSecondaryIndex> getAdditionalIndexers() { + return getInstances(CONF_ADDITIONAL_INDEXERS, MongoSecondaryIndex.class); + } + + public void setMongoClient(MongoClient client){ + this.mongoClient = client; + } + + public MongoClient getMongoClient() { + return mongoClient; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java new file mode 100644 index 0000000..bb5d58e --- /dev/null +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java @@ -0,0 +1,233 @@ +package mvm.rya.mongodb; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.commons.io.IOUtils; +import org.apache.log4j.Logger; + +import com.mongodb.DB; +import com.mongodb.DBCollection; +import com.mongodb.DBObject; +import com.mongodb.DuplicateKeyException; +import com.mongodb.InsertOptions; +import com.mongodb.MongoClient; + +import de.flapdoodle.embed.mongo.tests.MongodForTestsFactory; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.persist.RyaDAO; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.api.persist.RyaNamespaceManager; +import mvm.rya.api.persist.index.RyaSecondaryIndexer; +import mvm.rya.api.persist.query.RyaQueryEngine; +import mvm.rya.mongodb.dao.MongoDBNamespaceManager; +import mvm.rya.mongodb.dao.MongoDBStorageStrategy; +import mvm.rya.mongodb.dao.SimpleMongoDBNamespaceManager; +import mvm.rya.mongodb.dao.SimpleMongoDBStorageStrategy; + +/** + * Default DAO for mongo backed RYA allowing for CRUD operations. + */ +public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{ + private static final Logger log = Logger.getLogger(MongoDBRyaDAO.class); + + private MongoDBRdfConfiguration conf; + private MongoClient mongoClient; + private DB db; + private DBCollection coll; + private MongoDBQueryEngine queryEngine; + private MongoDBStorageStrategy storageStrategy; + private MongoDBNamespaceManager nameSpaceManager; + private MongodForTestsFactory testsFactory; + + private List<MongoSecondaryIndex> secondaryIndexers; + + /** + * Creates a new {@link MongoDBRyaDAO} + * @param conf + * @throws RyaDAOException + */ + public MongoDBRyaDAO(final MongoDBRdfConfiguration conf) throws RyaDAOException, NumberFormatException, UnknownHostException { + this.conf = conf; + mongoClient = MongoConnectorFactory.getMongoClient(conf); + conf.setMongoClient(mongoClient); + init(); + } + + + public MongoDBRyaDAO(final MongoDBRdfConfiguration conf, final MongoClient mongoClient) throws RyaDAOException{ + this.conf = conf; + this.mongoClient = mongoClient; + conf.setMongoClient(mongoClient); + init(); + } + + @Override + public void setConf(final MongoDBRdfConfiguration conf) { + this.conf = conf; + } + + public MongoClient getMongoClient(){ + return mongoClient; + } + + public void setDB(final DB db) { + this.db = db; + } + + + public void setDBCollection(final DBCollection coll) { + this.coll = coll; + } + + @Override + public MongoDBRdfConfiguration getConf() { + return conf; + } + + @Override + public void init() throws RyaDAOException { + secondaryIndexers = conf.getAdditionalIndexers(); + for(final MongoSecondaryIndex index: secondaryIndexers) { + index.setConf(conf); + index.setClient(mongoClient); + } + + db = mongoClient.getDB(conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME)); + coll = db.getCollection(conf.getTriplesCollectionName()); + nameSpaceManager = new SimpleMongoDBNamespaceManager(db.getCollection(conf.getNameSpacesCollectionName())); + queryEngine = new MongoDBQueryEngine(conf, mongoClient); + storageStrategy = new SimpleMongoDBStorageStrategy(); + storageStrategy.createIndices(coll); + for(final MongoSecondaryIndex index: secondaryIndexers) { + index.init(); + } + } + + @Override + public boolean isInitialized() throws RyaDAOException { + return true; + } + + @Override + public void destroy() throws RyaDAOException { + if (mongoClient != null) { + mongoClient.close(); + } + if (conf.getUseTestMongo()) { + testsFactory.shutdown(); + } + + IOUtils.closeQuietly(queryEngine); + } + + @Override + public void add(final RyaStatement statement) throws RyaDAOException { + // add it to the collection + try { + coll.insert(storageStrategy.serialize(statement)); + for(final RyaSecondaryIndexer index: secondaryIndexers) { + index.storeStatement(statement); + } + } catch (IOException e) { + log.error("Unable to add: " + statement.toString()); + throw new RyaDAOException(e); + } + catch (DuplicateKeyException e){ + log.error("Attempting to load duplicate triple: " + statement.toString()); + } + } + + @Override + public void add(final Iterator<RyaStatement> statement) throws RyaDAOException { + final List<DBObject> dbInserts = new ArrayList<DBObject>(); + while (statement.hasNext()){ + final RyaStatement ryaStatement = statement.next(); + final DBObject insert = storageStrategy.serialize(ryaStatement); + dbInserts.add(insert); + + try { + for (final RyaSecondaryIndexer index : secondaryIndexers) { + index.storeStatement(ryaStatement); + } + } catch (final IOException e) { + log.error("Failed to add: " + ryaStatement.toString() + " to the indexer"); + } + + } + coll.insert(dbInserts, new InsertOptions().continueOnError(true)); + } + + @Override + public void delete(final RyaStatement statement, final MongoDBRdfConfiguration conf) + throws RyaDAOException { + final DBObject obj = storageStrategy.getQuery(statement); + coll.remove(obj); + } + + @Override + public void dropGraph(final MongoDBRdfConfiguration conf, final RyaURI... graphs) + throws RyaDAOException { + + } + + @Override + public void delete(final Iterator<RyaStatement> statements, + final MongoDBRdfConfiguration conf) throws RyaDAOException { + while (statements.hasNext()){ + final RyaStatement ryaStatement = statements.next(); + coll.remove(storageStrategy.getQuery(ryaStatement)); + } + + } + + @Override + public String getVersion() throws RyaDAOException { + return "1.0"; + } + + @Override + public RyaQueryEngine<MongoDBRdfConfiguration> getQueryEngine() { + return queryEngine; + } + + @Override + public RyaNamespaceManager<MongoDBRdfConfiguration> getNamespaceManager() { + return nameSpaceManager; + } + + @Override + public void purge(final RdfCloudTripleStoreConfiguration configuration) { + + } + + @Override + public void dropAndDestroy() throws RyaDAOException { + db.dropDatabase(); // this is dangerous! + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoSecondaryIndex.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoSecondaryIndex.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoSecondaryIndex.java new file mode 100644 index 0000000..e32216f --- /dev/null +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoSecondaryIndex.java @@ -0,0 +1,31 @@ +package mvm.rya.mongodb; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.mongodb.MongoClient; + +import mvm.rya.api.persist.index.RyaSecondaryIndexer; + +public interface MongoSecondaryIndex extends RyaSecondaryIndexer{ + public void init(); + + public void setClient(MongoClient client); + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/MongoDBNamespaceManager.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/MongoDBNamespaceManager.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/MongoDBNamespaceManager.java new file mode 100644 index 0000000..fd9b659 --- /dev/null +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/MongoDBNamespaceManager.java @@ -0,0 +1,35 @@ +package mvm.rya.mongodb.dao; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.persist.RyaNamespaceManager; +import mvm.rya.api.persist.query.RyaQuery; +import mvm.rya.mongodb.MongoDBRdfConfiguration; + +import com.mongodb.DBCollection; +import com.mongodb.DBObject; + +public interface MongoDBNamespaceManager extends RyaNamespaceManager<MongoDBRdfConfiguration>{ + + public void createIndices(DBCollection coll); + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/MongoDBStorageStrategy.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/MongoDBStorageStrategy.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/MongoDBStorageStrategy.java new file mode 100644 index 0000000..5ae371b --- /dev/null +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/MongoDBStorageStrategy.java @@ -0,0 +1,45 @@ +package mvm.rya.mongodb.dao; + +import com.mongodb.DBCollection; +import com.mongodb.DBObject; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.persist.query.RyaQuery; + +/** + * Defines how objects are stored in MongoDB. + * <T> - The object to store in MongoDB + */ +public interface MongoDBStorageStrategy<T> { + + public DBObject getQuery(T statement); + + public RyaStatement deserializeDBObject(DBObject queryResult); + + public DBObject serialize(T statement); + + public DBObject getQuery(RyaQuery ryaQuery); + + public void createIndices(DBCollection coll); + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBNamespaceManager.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBNamespaceManager.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBNamespaceManager.java new file mode 100644 index 0000000..259420b --- /dev/null +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBNamespaceManager.java @@ -0,0 +1,181 @@ +package mvm.rya.mongodb.dao; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import info.aduna.iteration.CloseableIteration; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Map; + +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.mongodb.MongoDBRdfConfiguration; + +import org.apache.commons.codec.binary.Hex; +import org.openrdf.model.Namespace; + +import com.mongodb.BasicDBObject; +import com.mongodb.DBCollection; +import com.mongodb.DBCursor; +import com.mongodb.DBObject; + +public class SimpleMongoDBNamespaceManager implements MongoDBNamespaceManager { + + public class NamespaceImplementation implements Namespace { + + private String namespace; + private String prefix; + + public NamespaceImplementation(String namespace, String prefix) { + this.namespace = namespace; + this.prefix = prefix; + } + + @Override + public int compareTo(Namespace o) { + if (!namespace.equalsIgnoreCase(o.getName())) return namespace.compareTo(o.getName()); + if (!prefix.equalsIgnoreCase(o.getPrefix())) return prefix.compareTo(o.getPrefix()); + return 0; + } + + @Override + public String getName() { + return namespace; + } + + @Override + public String getPrefix() { + return prefix; + } + + } + + public class MongoCursorIteration implements + CloseableIteration<Namespace, RyaDAOException> { + private DBCursor cursor; + + public MongoCursorIteration(DBCursor cursor2) { + this.cursor = cursor2; + } + + @Override + public boolean hasNext() throws RyaDAOException { + return cursor.hasNext(); + } + + @Override + public Namespace next() throws RyaDAOException { + DBObject ns = cursor.next(); + Map values = ns.toMap(); + String namespace = (String) values.get(NAMESPACE); + String prefix = (String) values.get(PREFIX); + + Namespace temp = new NamespaceImplementation(namespace, prefix); + return temp; + } + + @Override + public void remove() throws RyaDAOException { + next(); + } + + @Override + public void close() throws RyaDAOException { + cursor.close(); + } + + } + + private static final String ID = "_id"; + private static final String PREFIX = "prefix"; + private static final String NAMESPACE = "namespace"; + private MongoDBRdfConfiguration conf; + private DBCollection nsColl; + + + public SimpleMongoDBNamespaceManager(DBCollection nameSpaceCollection) { + nsColl = nameSpaceCollection; + } + + @Override + public void createIndices(DBCollection coll){ + coll.createIndex(PREFIX); + coll.createIndex(NAMESPACE); + } + + + @Override + public void setConf(MongoDBRdfConfiguration paramC) { + this.conf = paramC; + } + + @Override + public MongoDBRdfConfiguration getConf() { + // TODO Auto-generated method stub + return conf; + } + + @Override + public void addNamespace(String prefix, String namespace) + throws RyaDAOException { + String id = prefix; + byte[] bytes = id.getBytes(); + try { + MessageDigest digest = MessageDigest.getInstance("SHA-1"); + bytes = digest.digest(bytes); + } catch (NoSuchAlgorithmException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + BasicDBObject doc = new BasicDBObject(ID, new String(Hex.encodeHex(bytes))) + .append(PREFIX, prefix) + .append(NAMESPACE, namespace); + nsColl.insert(doc); + + } + + @Override + public String getNamespace(String prefix) throws RyaDAOException { + DBObject query = new BasicDBObject().append(PREFIX, prefix); + DBCursor cursor = nsColl.find(query); + String nameSpace = prefix; + while (cursor.hasNext()){ + DBObject obj = cursor.next(); + nameSpace = (String) obj.toMap().get(NAMESPACE); + } + return nameSpace; + } + + @Override + public void removeNamespace(String prefix) throws RyaDAOException { + DBObject query = new BasicDBObject().append(PREFIX, prefix); + nsColl.remove(query); + } + + @Override + public CloseableIteration<? extends Namespace, RyaDAOException> iterateNamespace() + throws RyaDAOException { + DBObject query = new BasicDBObject(); + DBCursor cursor = nsColl.find(query); + return new MongoCursorIteration(cursor); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java new file mode 100644 index 0000000..d09316a --- /dev/null +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java @@ -0,0 +1,162 @@ +package mvm.rya.mongodb.dao; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static org.openrdf.model.vocabulary.XMLSchema.ANYURI; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Map; + +import org.apache.commons.codec.binary.Hex; +import org.apache.log4j.Logger; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.XMLSchema; + +import com.mongodb.BasicDBObject; +import com.mongodb.DBCollection; +import com.mongodb.DBObject; + +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.persist.query.RyaQuery; + +/** + * Defines how {@link RyaStatement}s are stored in MongoDB. + */ +public class SimpleMongoDBStorageStrategy implements MongoDBStorageStrategy<RyaStatement> { + private static final Logger LOG = Logger.getLogger(SimpleMongoDBStorageStrategy.class); + protected static final String ID = "_id"; + protected static final String OBJECT_TYPE = "objectType"; + protected static final String OBJECT_TYPE_VALUE = XMLSchema.ANYURI.stringValue(); + protected static final String CONTEXT = "context"; + protected static final String PREDICATE = "predicate"; + protected static final String OBJECT = "object"; + protected static final String SUBJECT = "subject"; + public static final String TIMESTAMP = "insertTimestamp"; + protected ValueFactoryImpl factory = new ValueFactoryImpl(); + + @Override + public void createIndices(final DBCollection coll){ + BasicDBObject doc = new BasicDBObject(); + doc.put(SUBJECT, 1); + doc.put(PREDICATE, 1); + coll.createIndex(doc); + doc = new BasicDBObject(PREDICATE, 1); + doc.put(OBJECT, 1); + doc.put(OBJECT_TYPE, 1); + coll.createIndex(doc); + doc = new BasicDBObject(OBJECT, 1); + doc = new BasicDBObject(OBJECT_TYPE, 1); + doc.put(SUBJECT, 1); + coll.createIndex(doc); + } + + @Override + public DBObject getQuery(final RyaStatement stmt) { + final RyaURI subject = stmt.getSubject(); + final RyaURI predicate = stmt.getPredicate(); + final RyaType object = stmt.getObject(); + final RyaURI context = stmt.getContext(); + final BasicDBObject query = new BasicDBObject(); + if (subject != null){ + query.append(SUBJECT, subject.getData()); + } + if (object != null){ + query.append(OBJECT, object.getData()); + query.append(OBJECT_TYPE, object.getDataType().toString()); + } + if (predicate != null){ + query.append(PREDICATE, predicate.getData()); + } + if (context != null){ + query.append(CONTEXT, context.getData()); + } + + return query; + } + + @Override + public RyaStatement deserializeDBObject(final DBObject queryResult) { + final Map result = queryResult.toMap(); + final String subject = (String) result.get(SUBJECT); + final String object = (String) result.get(OBJECT); + final String objectType = (String) result.get(OBJECT_TYPE); + final String predicate = (String) result.get(PREDICATE); + final String context = (String) result.get(CONTEXT); + final Long timestamp = (Long) result.get(TIMESTAMP); + RyaType objectRya = null; + if (objectType.equalsIgnoreCase(ANYURI.stringValue())){ + objectRya = new RyaURI(object); + } + else { + objectRya = new RyaType(factory.createURI(objectType), object); + } + + final RyaStatement statement; + if (!context.isEmpty()){ + statement = new RyaStatement(new RyaURI(subject), new RyaURI(predicate), objectRya, + new RyaURI(context)); + } else { + statement = new RyaStatement(new RyaURI(subject), new RyaURI(predicate), objectRya); + } + + if(timestamp != null) { + statement.setTimestamp(timestamp); + } + return statement; + } + + @Override + public DBObject serialize(final RyaStatement statement){ + return serializeInternal(statement); + } + + public BasicDBObject serializeInternal(final RyaStatement statement){ + String context = ""; + if (statement.getContext() != null){ + context = statement.getContext().getData(); + } + final String id = statement.getSubject().getData() + " " + + statement.getPredicate().getData() + " " + statement.getObject().getData() + " " + context; + byte[] bytes = id.getBytes(); + try { + final MessageDigest digest = MessageDigest.getInstance("SHA-1"); + bytes = digest.digest(bytes); + } catch (final NoSuchAlgorithmException e) { + LOG.error("Unable to perform SHA-1 on the ID, defaulting to raw bytes.", e); + } + final BasicDBObject doc = new BasicDBObject(ID, new String(Hex.encodeHex(bytes))) + .append(SUBJECT, statement.getSubject().getData()) + .append(PREDICATE, statement.getPredicate().getData()) + .append(OBJECT, statement.getObject().getData()) + .append(OBJECT_TYPE, statement.getObject().getDataType().toString()) + .append(CONTEXT, context) + .append(TIMESTAMP, statement.getTimestamp()); + return doc; + + } + + @Override + public DBObject getQuery(final RyaQuery ryaQuery) { + return getQuery(ryaQuery.getQuery()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java new file mode 100644 index 0000000..282ecbb --- /dev/null +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.mongodb.instance; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map.Entry; + +import javax.annotation.ParametersAreNonnullByDefault; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.mongodb.BasicDBList; +import com.mongodb.BasicDBObject; +import com.mongodb.BasicDBObjectBuilder; +import com.mongodb.DBObject; + +import mvm.rya.api.instance.RyaDetails; +import mvm.rya.api.instance.RyaDetails.EntityCentricIndexDetails; +import mvm.rya.api.instance.RyaDetails.FreeTextIndexDetails; +import mvm.rya.api.instance.RyaDetails.GeoIndexDetails; +import mvm.rya.api.instance.RyaDetails.JoinSelectivityDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy; +import mvm.rya.api.instance.RyaDetails.ProspectorDetails; +import mvm.rya.api.instance.RyaDetails.TemporalIndexDetails; + +/** + * Serializes configuration details for use in Mongo. + * The {@link DBObject} will look like: + * <pre> + * {@code + * { + * "instanceName": <string>, + * "version": <string>?, + * "entityCentricDetails": <boolean>, + * "geoDetails": <boolean>, + * "pcjDetails": { + * "enabled": <boolean>, + * "fluoName": <string>, + * "pcjs": [{ + * "id": <string>, + * "updateStrategy": <string>, + * "lastUpdate": <date> + * },...,{} + * ] + * }, + * "temporalDetails": <boolean>, + * "freeTextDetails": <boolean>, + * "prospectorDetails": <date>, + * "joinSelectivityDetails": <date> + * } + * </pre> + */ +@ParametersAreNonnullByDefault +public class MongoDetailsAdapter { + public static final String INSTANCE_KEY = "instanceName"; + public static final String VERSION_KEY = "version"; + + public static final String ENTITY_DETAILS_KEY = "entityCentricDetails"; + public static final String GEO_DETAILS_KEY = "geoDetails"; + public static final String PCJ_DETAILS_KEY = "pcjDetails"; + public static final String PCJ_ENABLED_KEY = "enabled"; + public static final String PCJ_FLUO_KEY = "fluoName"; + public static final String PCJ_PCJS_KEY = "pcjs"; + public static final String PCJ_ID_KEY = "id"; + public static final String PCJ_UPDATE_STRAT_KEY = "updateStrategy"; + public static final String PCJ_LAST_UPDATE_KEY = "lastUpdate"; + public static final String TEMPORAL_DETAILS_KEY = "temporalDetails"; + public static final String FREETEXT_DETAILS_KEY = "freeTextDetails"; + + public static final String PROSPECTOR_DETAILS_KEY = "prospectorDetails"; + public static final String JOIN_SELECTIVITY_DETAILS_KEY = "joinSelectivitiyDetails"; + + /** + * Serializes {@link RyaDetails} to mongo {@link DBObject}. + * @param details - The details to be serialized. + * @return The mongo {@link DBObject}. + */ + public static BasicDBObject toDBObject(final RyaDetails details) { + Preconditions.checkNotNull(details); + final BasicDBObjectBuilder builder = BasicDBObjectBuilder.start() + .add(INSTANCE_KEY, details.getRyaInstanceName()) + .add(VERSION_KEY, details.getRyaVersion()) + .add(ENTITY_DETAILS_KEY, details.getEntityCentricIndexDetails().isEnabled()) + .add(GEO_DETAILS_KEY, details.getGeoIndexDetails().isEnabled()) + .add(PCJ_DETAILS_KEY, toDBObject(details.getPCJIndexDetails())) + .add(TEMPORAL_DETAILS_KEY, details.getTemporalIndexDetails().isEnabled()) + .add(FREETEXT_DETAILS_KEY, details.getFreeTextIndexDetails().isEnabled()); + if(details.getProspectorDetails().getLastUpdated().isPresent()) { + builder.add(PROSPECTOR_DETAILS_KEY, details.getProspectorDetails().getLastUpdated().get()); + } + if(details.getJoinSelectivityDetails().getLastUpdated().isPresent()) { + builder.add(JOIN_SELECTIVITY_DETAILS_KEY, details.getJoinSelectivityDetails().getLastUpdated().get()); + } + return (BasicDBObject) builder.get(); + } + + private static DBObject toDBObject(final PCJIndexDetails pcjIndexDetails) { + requireNonNull(pcjIndexDetails); + + final BasicDBObjectBuilder builder = BasicDBObjectBuilder.start(); + + // Is Enabled + builder.add(PCJ_ENABLED_KEY, pcjIndexDetails.isEnabled()); + + // Fluo Details if present. + if(pcjIndexDetails.getFluoDetails().isPresent()) { + builder.add(PCJ_FLUO_KEY, pcjIndexDetails.getFluoDetails().get().getUpdateAppName()); + } + + // Add the PCJDetail objects. + final List<DBObject> pcjDetailsList = new ArrayList<>(); + for(final PCJDetails pcjDetails : pcjIndexDetails.getPCJDetails().values()) { + pcjDetailsList.add( toDBObject( pcjDetails ) ); + } + builder.add(PCJ_PCJS_KEY, pcjDetailsList.toArray()); + + return builder.get(); + } + + static DBObject toDBObject(final PCJDetails pcjDetails) { + requireNonNull(pcjDetails); + + final BasicDBObjectBuilder builder = BasicDBObjectBuilder.start(); + + // PCJ ID + builder.add(PCJ_ID_KEY, pcjDetails.getId()); + + // PCJ Update Strategy if present. + if(pcjDetails.getUpdateStrategy().isPresent()) { + builder.add(PCJ_UPDATE_STRAT_KEY, pcjDetails.getUpdateStrategy().get().name()); + } + + // Last Update Time if present. + if(pcjDetails.getLastUpdateTime().isPresent()) { + builder.add(PCJ_LAST_UPDATE_KEY, pcjDetails.getLastUpdateTime().get()); + } + + return builder.get(); + } + + public static RyaDetails toRyaDetails(final DBObject mongoObj) throws MalformedRyaDetailsException { + final BasicDBObject basicObj = (BasicDBObject) mongoObj; + try { + return RyaDetails.builder() + .setRyaInstanceName(basicObj.getString(INSTANCE_KEY)) + .setRyaVersion(basicObj.getString(VERSION_KEY)) + .setEntityCentricIndexDetails(new EntityCentricIndexDetails(basicObj.getBoolean(ENTITY_DETAILS_KEY))) + .setGeoIndexDetails(new GeoIndexDetails(basicObj.getBoolean(GEO_DETAILS_KEY))) + .setPCJIndexDetails(getPCJIndexDetails(basicObj)) + .setTemporalIndexDetails(new TemporalIndexDetails(basicObj.getBoolean(TEMPORAL_DETAILS_KEY))) + .setFreeTextDetails(new FreeTextIndexDetails(basicObj.getBoolean(FREETEXT_DETAILS_KEY))) + .setProspectorDetails(new ProspectorDetails(Optional.<Date>fromNullable(basicObj.getDate(PROSPECTOR_DETAILS_KEY)))) + .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>fromNullable(basicObj.getDate(JOIN_SELECTIVITY_DETAILS_KEY)))) + .build(); + } catch(final Exception e) { + throw new MalformedRyaDetailsException("Failed to make RyaDetail from Mongo Object, it is malformed.", e); + } + } + + private static PCJIndexDetails.Builder getPCJIndexDetails(final BasicDBObject basicObj) { + final BasicDBObject pcjIndexDBO = (BasicDBObject) basicObj.get(PCJ_DETAILS_KEY); + + final PCJIndexDetails.Builder pcjBuilder = PCJIndexDetails.builder() + .setEnabled(pcjIndexDBO.getBoolean(PCJ_ENABLED_KEY)) + .setFluoDetails(new FluoDetails(pcjIndexDBO.getString(PCJ_FLUO_KEY))); + + final BasicDBList pcjs = (BasicDBList) pcjIndexDBO.get(PCJ_PCJS_KEY); + if(pcjs != null) { + for(int ii = 0; ii < pcjs.size(); ii++) { + final BasicDBObject pcj = (BasicDBObject) pcjs.get(ii); + pcjBuilder.addPCJDetails( toPCJDetails(pcj) ); + } + } + return pcjBuilder; + } + + static PCJDetails.Builder toPCJDetails(final BasicDBObject dbo) { + requireNonNull(dbo); + + // PCJ ID. + final PCJDetails.Builder builder = PCJDetails.builder() + .setId( dbo.getString(PCJ_ID_KEY) ); + + // PCJ Update Strategy if present. + if(dbo.containsField(PCJ_UPDATE_STRAT_KEY)) { + builder.setUpdateStrategy( PCJUpdateStrategy.valueOf( dbo.getString(PCJ_UPDATE_STRAT_KEY) ) ); + } + + // Last Update Time if present. + if(dbo.containsField(PCJ_LAST_UPDATE_KEY)) { + builder.setLastUpdateTime( dbo.getDate(PCJ_LAST_UPDATE_KEY) ); + } + + return builder; + } + + /** + * Exception thrown when a MongoDB {@link DBObject} is malformed when attemptin + * to adapt it into a {@link RyaDetails}. + */ + public static class MalformedRyaDetailsException extends Exception { + private static final long serialVersionUID = 1L; + + /** + * Creates a new {@link MalformedRyaDetailsException} + * @param message - The message to be displayed by the exception. + * @param e - The source cause of the exception. + */ + public MalformedRyaDetailsException(final String message, final Throwable e) { + super(message, e); + } + } +}
