http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/07643eb7/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/TypeStorage.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/TypeStorage.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/TypeStorage.java new file mode 100644 index 0000000..2d4622f --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/TypeStorage.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.entity.storage; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.rya.indexing.entity.EntityIndexException; +import org.apache.rya.indexing.entity.model.Type; + +import com.google.common.base.Optional; + +import mvm.rya.api.domain.RyaURI; + +/** + * Stores and provides access to {@link Type}s. + */ +@ParametersAreNonnullByDefault +public interface TypeStorage { + + /** + * Creates a new {@link Type} within the storage. The new Type's ID must be unique. + * + * @param type - The {@link Type} to create. (not null) + * @throws TypeStorageException A problem occurred while creating the Type. + */ + public void create(Type type) throws TypeStorageException; + + /** + * Get a {@link Type} from the storage by its ID. + * + * @param typeId - The {@link Type}'s ID. (not null) + * @return The {@link Type} if one exists for the ID. + * @throws TypeStorageException A problem occurred while fetching from the storage. + */ + public Optional<Type> get(RyaURI typeId) throws TypeStorageException; + + /** + * Get all {@link Type}s that include a specific {@link Property} name. + * + * @param propertyName - The name to search for. (not null) + * @return All {@link Type}s that include {@code propertyName}. + * @throws TypeStorageException A problem occurred while searching for the Types + * that have the Property name. + */ + public CloseableIterator<Type> search(RyaURI propertyName) throws TypeStorageException; + + /** + * Deletes a {@link Type} from the storage. + * + * @param typeId - The ID of the {@link Type} to delete. (not null) + * @return {@code true} if something was deleted; otherwise {@code false}. + * @throws TypeStorageException A problem occurred while deleting from the storage. + */ + public boolean delete(RyaURI typeId) throws TypeStorageException; + + /** + * A problem occurred while interacting with a {@link TypeStorage}. + */ + public static class TypeStorageException extends EntityIndexException { + private static final long serialVersionUID = 1L; + + /** + * Constructs a new exception with the specified detail message. The + * cause is not initialized, and may subsequently be initialized by + * a call to {@link #initCause}. + * + * @param message the detail message. The detail message is saved for + * later retrieval by the {@link #getMessage()} method. + */ + public TypeStorageException(String message) { + super(message); + } + + /** + * Constructs a new exception with the specified detail message and + * cause. <p>Note that the detail message associated with + * {@code cause} is <i>not</i> automatically incorporated in + * this exception's detail message. + * + * @param message the detail message (which is saved for later retrieval + * by the {@link #getMessage()} method). + * @param cause the cause (which is saved for later retrieval by the + * {@link #getCause()} method). (A <tt>null</tt> value is + * permitted, and indicates that the cause is nonexistent or + * unknown.) + */ + public TypeStorageException(String message, Throwable cause) { + super(message, cause); + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/07643eb7/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/ConvertingCursor.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/ConvertingCursor.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/ConvertingCursor.java new file mode 100644 index 0000000..31a9308 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/ConvertingCursor.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.entity.storage.mongo; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.util.function.Function; + +import org.apache.rya.indexing.entity.storage.CloseableIterator; +import org.bson.Document; + +import com.mongodb.client.MongoCursor; + +/** + * Converts the {@link Document}s that are returned by a {@link MongoCursor} + * using a {@link DocumentConverter} every time {@link #next()} is invoked. + * + * @param <T> - The type of object the Documents are converted into. + */ +public class ConvertingCursor<T> implements CloseableIterator<T> { + + private final Converter<T> converter; + private final MongoCursor<Document> cursor; + + /** + * Constructs an instance of {@link ConvertingCursor}. + * + * @param converter - Converts the {@link Document}s returned by {@code cursor}. (not null) + * @param cursor - Retrieves the {@link Document}s from a Mongo DB instance. (not null) + */ + public ConvertingCursor(Converter<T> converter, MongoCursor<Document> cursor) { + this.converter = requireNonNull(converter); + this.cursor = requireNonNull(cursor); + } + + @Override + public boolean hasNext() { + return cursor.hasNext(); + } + + @Override + public T next() { + return converter.apply( cursor.next() ); + } + + @Override + public void close() throws IOException { + cursor.close(); + } + + /** + * Converts a {@link Document} into some other object. + * + * @param <R> The type of object the Document is converted into. + */ + @FunctionalInterface + public static interface Converter<R> extends Function<Document, R> { } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/07643eb7/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/DocumentConverter.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/DocumentConverter.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/DocumentConverter.java new file mode 100644 index 0000000..da3e8b6 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/DocumentConverter.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.entity.storage.mongo; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.bson.Document; + +/** + * Converts an object to/from a {@link Document}. + * + * @param <T> - The type of object that is converted to/from a {@link Document}. + */ +@ParametersAreNonnullByDefault +public interface DocumentConverter<T> { + + /** + * Converts an object into a {@link Document}. + * + * @param object - The object to convert. (not null) + * @return A {@link Document} representing the object. + * @throws DocumentConverterException A problem occurred while converting the object. + */ + public Document toDocument(T object) throws DocumentConverterException; + + /** + * Converts a {@link Document} into the target object. + * + * @param document - The document to convert. (not null) + * @return The target object representation of the document. + * @throws DocumentConverterException A problem occurred while converting the {@link Document}. + */ + public T fromDocument(Document document) throws DocumentConverterException; + + /** + * A problem occurred while converting an object while using a {@link DocumentConverter}. + */ + public static class DocumentConverterException extends Exception { + private static final long serialVersionUID = 1L; + + public DocumentConverterException(String message) { + super(message); + } + + public DocumentConverterException(String message, Throwable cause) { + super(message, cause); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/07643eb7/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/EntityDocumentConverter.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/EntityDocumentConverter.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/EntityDocumentConverter.java new file mode 100644 index 0000000..6a2fb74 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/EntityDocumentConverter.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.entity.storage.mongo; + +import static java.util.Objects.requireNonNull; + +import java.util.List; +import java.util.stream.Collectors; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.rya.indexing.entity.model.Entity; +import org.apache.rya.indexing.entity.model.Property; +import org.bson.Document; + +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.domain.RyaURI; + +/** + * Converts between {@link Entity} and {@link Document}. + */ +@ParametersAreNonnullByDefault +public class EntityDocumentConverter implements DocumentConverter<Entity> { + + public static final String SUBJECT = "_id"; + public static final String EXPLICIT_TYPE_IDS = "explicitTypeIds"; + public static final String PROPERTIES = "properties"; + public static final String VERSION = "version"; + + private final RyaTypeDocumentConverter ryaTypeConverter = new RyaTypeDocumentConverter(); + + @Override + public Document toDocument(Entity entity) { + requireNonNull(entity); + + final Document doc = new Document(); + doc.append(SUBJECT, entity.getSubject().getData()); + + doc.append(EXPLICIT_TYPE_IDS, entity.getExplicitTypeIds().stream() + .map(explicitTypeId -> explicitTypeId.getData()) + .collect(Collectors.toList())); + + final Document propertiesDoc = new Document(); + for(final RyaURI typeId : entity.getProperties().keySet()) { + final Document typePropertiesDoc = new Document(); + entity.getProperties().get(typeId) + .forEach((propertyNameUri, property) -> { + final String propertyName = property.getName().getData(); + final RyaType value = property.getValue(); + typePropertiesDoc.append(propertyName, ryaTypeConverter.toDocument(value)); + }); + propertiesDoc.append(typeId.getData(), typePropertiesDoc); + } + doc.append(PROPERTIES, propertiesDoc); + + doc.append(VERSION, entity.getVersion()); + + return doc; + } + + @Override + public Entity fromDocument(Document document) throws DocumentConverterException { + requireNonNull(document); + + // Preconditions. + if(!document.containsKey(SUBJECT)) { + throw new DocumentConverterException("Could not convert document '" + document + + "' because its '" + SUBJECT + "' field is missing."); + } + + if(!document.containsKey(EXPLICIT_TYPE_IDS)) { + throw new DocumentConverterException("Could not convert document '" + document + + "' because its '" + EXPLICIT_TYPE_IDS + "' field is missing."); + } + + if(!document.containsKey(PROPERTIES)) { + throw new DocumentConverterException("Could not convert document '" + document + + "' because its '" + PROPERTIES + "' field is missing."); + } + + if(!document.containsKey(VERSION)) { + throw new DocumentConverterException("Could not convert document '" + document + + "' because its '" + VERSION + "' field is missing."); + } + + // Perform the conversion. + final Entity.Builder builder = Entity.builder() + .setSubject( new RyaURI(document.getString(SUBJECT)) ); + + ((List<String>)document.get(EXPLICIT_TYPE_IDS)).stream() + .forEach(explicitTypeId -> builder.setExplicitType(new RyaURI(explicitTypeId))); + + final Document propertiesDoc = (Document) document.get(PROPERTIES); + for(final String typeId : propertiesDoc.keySet()) { + final Document typePropertiesDoc = (Document) propertiesDoc.get(typeId); + for(final String propertyName : typePropertiesDoc.keySet()) { + final Document value = (Document) typePropertiesDoc.get(propertyName); + final RyaType propertyValue = ryaTypeConverter.fromDocument( value ); + builder.setProperty(new RyaURI(typeId), new Property(new RyaURI(propertyName), propertyValue)); + } + } + + builder.setVersion( document.getInteger(VERSION) ); + + return builder.build(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/07643eb7/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorage.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorage.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorage.java new file mode 100644 index 0000000..8a5f6da --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorage.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.entity.storage.mongo; + +import static java.util.Objects.requireNonNull; + +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.rya.indexing.entity.model.Entity; +import org.apache.rya.indexing.entity.model.Property; +import org.apache.rya.indexing.entity.model.Type; +import org.apache.rya.indexing.entity.model.TypedEntity; +import org.apache.rya.indexing.entity.storage.CloseableIterator; +import org.apache.rya.indexing.entity.storage.EntityStorage; +import org.apache.rya.indexing.entity.storage.mongo.ConvertingCursor.Converter; +import org.apache.rya.indexing.entity.storage.mongo.DocumentConverter.DocumentConverterException; +import org.bson.Document; +import org.bson.conversions.Bson; + +import com.google.common.base.Joiner; +import com.mongodb.ErrorCategory; +import com.mongodb.MongoClient; +import com.mongodb.MongoException; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.model.Filters; + +import mvm.rya.api.domain.RyaURI; + +/** + * A Mongo DB implementation of {@link EntityStorage}. + */ +@ParametersAreNonnullByDefault +public class MongoEntityStorage implements EntityStorage { + + private static final String COLLECTION_NAME = "entity-entities"; + + private static final EntityDocumentConverter ENTITY_CONVERTER = new EntityDocumentConverter(); + + /** + * A client connected to the Mongo instance that hosts the Rya instance. + */ + private final MongoClient mongo; + + /** + * The name of the Rya instance the {@link TypedEntity}s are for. + */ + private final String ryaInstanceName; + + /** + * Constructs an instance of {@link MongoEntityStorage}. + * + * @param mongo - A client connected to the Mongo instance that hosts the Rya instance. (not null) + * @param ryaInstanceName - The name of the Rya instance the {@link TypedEntity}s are for. (not null) + */ + public MongoEntityStorage(MongoClient mongo, String ryaInstanceName) { + this.mongo = requireNonNull(mongo); + this.ryaInstanceName = requireNonNull(ryaInstanceName); + } + + @Override + public void create(Entity entity) throws EntityStorageException { + requireNonNull(entity); + + try { + mongo.getDatabase(ryaInstanceName) + .getCollection(COLLECTION_NAME) + .insertOne( ENTITY_CONVERTER.toDocument(entity) ); + + } catch(final MongoException e) { + final ErrorCategory category = ErrorCategory.fromErrorCode( e.getCode() ); + if(category == ErrorCategory.DUPLICATE_KEY) { + throw new EntityAlreadyExistsException("Failed to create Entity with Subject '" + entity.getSubject().getData() + "'.", e); + } + throw new EntityStorageException("Failed to create Entity with Subject '" + entity.getSubject().getData() + "'.", e); + } + } + + @Override + public void update(Entity old, Entity updated) throws StaleUpdateException, EntityStorageException { + requireNonNull(old); + requireNonNull(updated); + + // The updated entity must have the same Subject as the one it is replacing. + if(!old.getSubject().equals(updated.getSubject())) { + throw new EntityStorageException("The old Entity and the updated Entity must have the same Subject. " + + "Old Subject: " + old.getSubject().getData() + ", Updated Subject: " + updated.getSubject().getData()); + } + + // Make sure the updated Entity has a higher verison. + if(old.getVersion() >= updated.getVersion()) { + throw new EntityStorageException("The old Entity's version must be less than the updated Entity's version." + + " Old version: " + old.getVersion() + " Updated version: " + updated.getVersion()); + } + + final Set<Bson> filters = new HashSet<>(); + + // Must match the old entity's Subject. + filters.add( makeSubjectFilter(old.getSubject()) ); + + // Must match the old entity's Version. + filters.add( makeVersionFilter(old.getVersion()) ); + + // Do a find and replace. + final Bson oldEntityFilter = Filters.and(filters); + final Document updatedDoc = ENTITY_CONVERTER.toDocument(updated); + + final MongoCollection<Document> collection = mongo.getDatabase(ryaInstanceName).getCollection(COLLECTION_NAME); + if(collection.findOneAndReplace(oldEntityFilter, updatedDoc) == null) { + throw new StaleUpdateException("Could not update the Entity with Subject '" + updated.getSubject().getData() + "."); + } + } + + @Override + public Optional<Entity> get(RyaURI subject) throws EntityStorageException { + requireNonNull(subject); + + try { + final Document document = mongo.getDatabase(ryaInstanceName) + .getCollection(COLLECTION_NAME) + .find( Filters.eq(EntityDocumentConverter.SUBJECT, subject.getData()) ) + .first(); + + return document == null ? + Optional.empty() : + Optional.of( ENTITY_CONVERTER.fromDocument(document) ); + + } catch(final MongoException | DocumentConverterException e) { + throw new EntityStorageException("Could not get the Entity with Subject '" + subject.getData() + "'.", e); + } + } + + @Override + public CloseableIterator<TypedEntity> search(final Type type, final Set<Property> properties) throws EntityStorageException { + requireNonNull(type); + requireNonNull(properties); + + try { + // Match the specified Property values. + final Set<Bson> filters = properties.stream() + .flatMap(property -> makePropertyFilters(type.getId(), property)) + .collect(Collectors.toSet()); + + // Only match explicitly Typed entities. + filters.add( makeExplicitTypeFilter(type.getId()) ); + + // Get a cursor over the Mongo Document that represent the search results. + final MongoCursor<Document> cursor = mongo.getDatabase(ryaInstanceName) + .getCollection(COLLECTION_NAME) + .find(Filters.and(filters)) + .iterator(); + + // Define that Converter that converts from Document into TypedEntity. + final Converter<TypedEntity> converter = document -> { + try { + final Entity entity = ENTITY_CONVERTER.fromDocument(document); + final Optional<TypedEntity> typedEntity = entity.makeTypedEntity( type.getId() ); + if(!typedEntity.isPresent()) { + throw new RuntimeException("Entity with Subject '" + entity.getSubject() + + "' could not be cast into Type '" + type.getId() + "'."); + } + return typedEntity.get(); + + } catch (final DocumentConverterException e) { + throw new RuntimeException("Document '" + document + "' could not be parsed into an Entity.", e); + } + }; + + // Return a cursor that performs the conversion. + return new ConvertingCursor<TypedEntity>(converter, cursor); + + } catch(final MongoException e) { + throw new EntityStorageException("Could not search Entity.", e); + } + } + + @Override + public boolean delete(RyaURI subject) throws EntityStorageException { + requireNonNull(subject); + + try { + final Document deleted = mongo.getDatabase(ryaInstanceName) + .getCollection(COLLECTION_NAME) + .findOneAndDelete( makeSubjectFilter(subject) ); + + return deleted != null; + + } catch(final MongoException e) { + throw new EntityStorageException("Could not delete the Entity with Subject '" + subject.getData() + "'.", e); + } + } + + private static Bson makeSubjectFilter(RyaURI subject) { + return Filters.eq(EntityDocumentConverter.SUBJECT, subject.getData()); + } + + private static Bson makeVersionFilter(int version) { + return Filters.eq(EntityDocumentConverter.VERSION, version); + } + + private static Bson makeExplicitTypeFilter(RyaURI typeId) { + return Filters.eq(EntityDocumentConverter.EXPLICIT_TYPE_IDS, typeId.getData()); + } + + private static Stream<Bson> makePropertyFilters(RyaURI typeId, Property property) { + final String propertyName = property.getName().getData(); + + // Must match the property's data type. + final String dataTypePath = Joiner.on(".").join( + new String[]{EntityDocumentConverter.PROPERTIES, typeId.getData(), propertyName, RyaTypeDocumentConverter.DATA_TYPE}); + final String propertyDataType = property.getValue().getDataType().stringValue(); + final Bson dataTypeFilter = Filters.eq(dataTypePath, propertyDataType); + + // Must match the property's value. + final String valuePath = Joiner.on(".").join( + new String[]{EntityDocumentConverter.PROPERTIES, typeId.getData(), propertyName, RyaTypeDocumentConverter.VALUE}); + final String propertyValue = property.getValue().getData(); + final Bson valueFilter = Filters.eq(valuePath, propertyValue); + + return Stream.of(dataTypeFilter, valueFilter); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/07643eb7/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/MongoTypeStorage.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/MongoTypeStorage.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/MongoTypeStorage.java new file mode 100644 index 0000000..88a0aa1 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/MongoTypeStorage.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.entity.storage.mongo; + +import static java.util.Objects.requireNonNull; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.rya.indexing.entity.model.Type; +import org.apache.rya.indexing.entity.storage.CloseableIterator; +import org.apache.rya.indexing.entity.storage.TypeStorage; +import org.apache.rya.indexing.entity.storage.mongo.DocumentConverter.DocumentConverterException; +import org.bson.Document; +import org.bson.conversions.Bson; + +import com.google.common.base.Optional; +import com.mongodb.MongoClient; +import com.mongodb.MongoException; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.model.Filters; + +import mvm.rya.api.domain.RyaURI; + +/** + * A Mongo DB implementation of {@link TypeStorage}. + */ +@ParametersAreNonnullByDefault +public class MongoTypeStorage implements TypeStorage { + + private static final String COLLECTION_NAME = "entity-types"; + + private static final TypeDocumentConverter TYPE_CONVERTER = new TypeDocumentConverter(); + + /** + * A client connected to the Mongo instance that hosts the Rya instance. + */ + private final MongoClient mongo; + + /** + * The name of the Rya instance the {@link Type}s are for. + */ + private final String ryaInstanceName; + + /** + * Constructs an instance of {@link MongoTypeStorage}. + * + * @param mongo - A client connected to the Mongo instance that hosts the Rya instance. (not null) + * @param ryaInstanceName - The name of the Rya instance the {@link Type}s are for. (not null) + */ + public MongoTypeStorage(MongoClient mongo, String ryaInstanceName) { + this.mongo = requireNonNull(mongo); + this.ryaInstanceName = requireNonNull(ryaInstanceName); + } + + @Override + public void create(Type type) throws TypeStorageException { + requireNonNull(type); + + try { + mongo.getDatabase(ryaInstanceName) + .getCollection(COLLECTION_NAME) + .insertOne(TYPE_CONVERTER.toDocument(type)); + + } catch(final MongoException e) { + throw new TypeStorageException("Failed to create Type with ID '" + type.getId().getData() + "'.", e); + } + } + + @Override + public Optional<Type> get(RyaURI typeId) throws TypeStorageException { + requireNonNull(typeId); + + try { + final Document document = mongo.getDatabase(ryaInstanceName) + .getCollection(COLLECTION_NAME) + .find( makeIdFilter(typeId) ) + .first(); + + return document == null ? + Optional.absent() : + Optional.of( TYPE_CONVERTER.fromDocument(document) ); + + } catch(final MongoException | DocumentConverterException e) { + throw new TypeStorageException("Could not get the Type with ID '" + typeId.getData() + "'.", e); + } + } + + @Override + public CloseableIterator<Type> search(RyaURI propertyName) throws TypeStorageException { + requireNonNull(propertyName); + + try { + // Create a Filter that finds Types who have the provided property names. + final Bson byPropertyName = Filters.eq(TypeDocumentConverter.PROPERTY_NAMES, propertyName.getData()); + + final MongoCursor<Document> cursor = mongo.getDatabase(ryaInstanceName) + .getCollection(COLLECTION_NAME) + .find( byPropertyName ) + .iterator(); + + return new ConvertingCursor<Type>(document -> { + try { + return TYPE_CONVERTER.fromDocument(document); + } catch (final Exception e) { + throw new RuntimeException("Could not convert the Document '" + document + "' into a Type.", e); + } + }, cursor); + + } catch(final MongoException e) { + throw new TypeStorageException("Could not fetch Types that include the property '" + propertyName.getData() + "'.", e); + } + } + + @Override + public boolean delete(RyaURI typeId) throws TypeStorageException { + requireNonNull(typeId); + + try { + final Document deleted = mongo.getDatabase(ryaInstanceName) + .getCollection(COLLECTION_NAME) + .findOneAndDelete( makeIdFilter(typeId) ); + + return deleted != null; + + } catch(final MongoException e) { + throw new TypeStorageException("Could not delete the Type with ID '" + typeId.getData() + "'.", e); + } + } + + private static Bson makeIdFilter(RyaURI typeId) { + return Filters.eq(TypeDocumentConverter.ID, typeId.getData()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/07643eb7/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/RyaTypeDocumentConverter.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/RyaTypeDocumentConverter.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/RyaTypeDocumentConverter.java new file mode 100644 index 0000000..f7fd4ac --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/RyaTypeDocumentConverter.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.entity.storage.mongo; + +import static java.util.Objects.requireNonNull; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.bson.Document; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; + +import mvm.rya.api.domain.RyaType; + +/** + * Converts between {@link RyaType} and {@link Document}. + */ +@ParametersAreNonnullByDefault +public class RyaTypeDocumentConverter implements DocumentConverter<RyaType> { + + private static final ValueFactory VF = new ValueFactoryImpl(); + + public static final String DATA_TYPE = "dataType"; + public static final String VALUE = "value"; + + @Override + public Document toDocument(RyaType ryaType) { + requireNonNull(ryaType); + + return new Document() + .append(DATA_TYPE, ryaType.getDataType().toString()) + .append(VALUE, ryaType.getData()); + } + + @Override + public RyaType fromDocument(Document document) throws DocumentConverterException { + requireNonNull(document); + + if(!document.containsKey(DATA_TYPE)) { + throw new DocumentConverterException("Could not convert document '" + document + + "' because its '" + DATA_TYPE + "' field is missing."); + } + + if(!document.containsKey(VALUE)) { + throw new DocumentConverterException("Could not convert document '" + document + + "' because its '" + VALUE + "' field is missing."); + } + + return new RyaType( + VF.createURI( document.getString(DATA_TYPE) ), + document.getString(VALUE)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/07643eb7/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/TypeDocumentConverter.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/TypeDocumentConverter.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/TypeDocumentConverter.java new file mode 100644 index 0000000..104fbad --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/TypeDocumentConverter.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.entity.storage.mongo; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.List; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.rya.indexing.entity.model.Type; +import org.bson.Document; + +import com.google.common.collect.ImmutableSet; + +import mvm.rya.api.domain.RyaURI; + +/** + * Converts between {@link Type} and {@link Document}. + */ +@ParametersAreNonnullByDefault +public class TypeDocumentConverter implements DocumentConverter<Type> { + + public static final String ID = "_id"; + public static final String PROPERTY_NAMES = "propertyNames"; + + @Override + public Document toDocument(final Type type) { + requireNonNull(type); + + final Document doc = new Document(); + doc.append(ID, type.getId().getData()); + + final List<String> propertyNames = new ArrayList<>(); + type.getPropertyNames().forEach(field -> propertyNames.add(field.getData())); + doc.append(PROPERTY_NAMES, propertyNames); + + return doc; + } + + @Override + public Type fromDocument(final Document document) throws DocumentConverterException { + requireNonNull(document); + + if(!document.containsKey(ID)) { + throw new DocumentConverterException("Could not convert document '" + document + + "' because its '" + ID + "' field is missing."); + } + + if(!document.containsKey(PROPERTY_NAMES)) { + throw new DocumentConverterException("Could not convert document '" + document + + "' because its '" + PROPERTY_NAMES + "' field is missing."); + } + + final RyaURI typeId = new RyaURI( document.getString(ID) ); + + final ImmutableSet.Builder<RyaURI> propertyNames = ImmutableSet.builder(); + ((List<String>) document.get(PROPERTY_NAMES)) + .forEach(propertyName -> propertyNames.add(new RyaURI(propertyName))); + + return new Type(typeId, propertyNames.build()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/07643eb7/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/BaseEntityIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/BaseEntityIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/BaseEntityIndexer.java new file mode 100644 index 0000000..0bd9261 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/BaseEntityIndexer.java @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.entity.update; + +import static com.google.common.base.Preconditions.checkState; +import static java.util.Collections.singleton; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.groupingBy; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +import javax.annotation.Nullable; +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.hadoop.conf.Configuration; +import org.apache.rya.indexing.entity.model.Entity; +import org.apache.rya.indexing.entity.model.Property; +import org.apache.rya.indexing.entity.model.Type; +import org.apache.rya.indexing.entity.storage.CloseableIterator; +import org.apache.rya.indexing.entity.storage.EntityStorage; +import org.apache.rya.indexing.entity.storage.TypeStorage; +import org.apache.rya.indexing.entity.storage.EntityStorage.EntityStorageException; +import org.apache.rya.indexing.entity.storage.TypeStorage.TypeStorageException; +import org.openrdf.model.URI; +import org.openrdf.model.vocabulary.RDF; + +import com.google.common.base.Objects; + +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.mongodb.MongoDBRdfConfiguration; + +/** + * A base class that may be used to update an {@link EntityStorage} as new + * {@link RyaStatement}s are added to/removed from the Rya instance. + */ +@ParametersAreNonnullByDefault +public abstract class BaseEntityIndexer implements EntityIndexer { + + /** + * When this URI is the Predicate of a Statement, it indicates a {@link Type} for an {@link Entity}. + */ + private static final RyaURI TYPE_URI = new RyaURI( RDF.TYPE.toString() ); + + private final AtomicReference<MongoDBRdfConfiguration> configuration = new AtomicReference<>(); + private final AtomicReference<EntityStorage> entities = new AtomicReference<>(); + private final AtomicReference<TypeStorage> types = new AtomicReference<>(); + + /** + * Creates the {@link EntityStorage} that will be used by the indexer. + * + * @param conf - Indicates how the {@link EntityStorage} is initialized. (not null) + * @return The {@link EntityStorage} that will be used by this indexer. + */ + public abstract @Nullable EntityStorage getEntityStorage(Configuration conf); + + /** + * Creates the {@link TypeStorage} that will be used by the indexer. + * + * @param conf - Indicates how the {@link TypeStorage} is initialized. (not null) + * @return The {@link TypeStorage} that will be used by this indexer. + */ + public abstract @Nullable TypeStorage getTypeStorage(Configuration conf); + + @Override + public void setConf(Configuration conf) { + requireNonNull(conf); + entities.set( getEntityStorage(conf) ); + types.set( getTypeStorage(conf) ); + } + + @Override + public Configuration getConf() { + return configuration.get(); + } + + @Override + public void storeStatement(RyaStatement statement) throws IOException { + requireNonNull(statement); + storeStatements( singleton(statement) ); + } + + @Override + public void storeStatements(Collection<RyaStatement> statements) throws IOException { + requireNonNull(statements); + + final Map<RyaURI,List<RyaStatement>> groupedBySubject = statements.stream() + .collect(groupingBy(RyaStatement::getSubject)); + + for(final Entry<RyaURI, List<RyaStatement>> entry : groupedBySubject.entrySet()) { + try { + updateEntity(entry.getKey(), entry.getValue()); + } catch (final EntityStorageException e) { + throw new IOException("Failed to update the Entity index.", e); + } + } + } + + /** + * Updates a {@link Entity} to reflect new {@link RyaStatement}s. + * + * @param subject - The Subject of the {@link Entity} the statements are for. (not null) + * @param statements - Statements that the {@link Entity} will be updated with. (not null) + */ + private void updateEntity(final RyaURI subject, final Collection<RyaStatement> statements) throws EntityStorageException { + requireNonNull(subject); + requireNonNull(statements); + + final EntityStorage entities = this.entities.get(); + final TypeStorage types = this.types.get(); + checkState(entities != null, "Must set this indexers configuration before storing statements."); + checkState(types != null, "Must set this indexers configuration before storing statements."); + + new EntityUpdater(entities).update(subject, old -> { + // Create a builder with the updated Version. + final Entity.Builder updated; + if(!old.isPresent()) { + updated = Entity.builder() + .setSubject(subject) + .setVersion(0); + } else { + final int updatedVersion = old.get().getVersion() + 1; + updated = Entity.builder(old.get()) + .setVersion( updatedVersion ); + } + + // Update the entity based on the Statements. + for(final RyaStatement statement : statements) { + + // The Statement is setting an Explicit Type ID for the Entity. + if(Objects.equal(TYPE_URI, statement.getPredicate())) { + final RyaURI typeId = new RyaURI(statement.getObject().getData()); + updated.setExplicitType(typeId); + } + + // The Statement is adding a Property to the Entity. + else { + final RyaURI propertyName = statement.getPredicate(); + final RyaType propertyValue = statement.getObject(); + + try(final CloseableIterator<Type> typesIt = types.search(propertyName)) { + // Set the Property for each type that includes the Statement's predicate. + while(typesIt.hasNext()) { + final RyaURI typeId = typesIt.next().getId(); + updated.setProperty(typeId, new Property(propertyName, propertyValue)); + } + } catch (final TypeStorageException | IOException e) { + throw new RuntimeException("Failed to fetch Types that include the property name '" + + statement.getPredicate().getData() + "'.", e); + } + } + } + + return Optional.of( updated.build() ); + }); + } + + @Override + public void deleteStatement(RyaStatement statement) throws IOException { + requireNonNull(statement); + + final EntityStorage entities = this.entities.get(); + final TypeStorage types = this.types.get(); + checkState(entities != null, "Must set this indexers configuration before storing statements."); + checkState(types != null, "Must set this indexers configuration before storing statements."); + + try { + new EntityUpdater(entities).update(statement.getSubject(), old -> { + // If there is no Entity for the subject of the statement, then do nothing. + if(!old.isPresent()) { + return Optional.empty(); + } + + final Entity oldEntity = old.get(); + + // Increment the version of the Entity. + final Entity.Builder updated = Entity.builder(oldEntity); + updated.setVersion(oldEntity.getVersion() + 1); + + if(TYPE_URI.equals(statement.getPredicate())) { + // If the Type ID already isn't in the list of explicit types, then do nothing. + final RyaURI typeId = new RyaURI( statement.getObject().getData() ); + if(!oldEntity.getExplicitTypeIds().contains(typeId)) { + return Optional.empty(); + } + + // Otherwise remove it from the list. + updated.unsetExplicitType(typeId); + } else { + // If the deleted property appears within the old entity's properties, then remove it. + final RyaURI deletedPropertyName = statement.getPredicate(); + + boolean propertyWasPresent = false; + for(final RyaURI typeId : oldEntity.getProperties().keySet()) { + for(final RyaURI propertyName : oldEntity.getProperties().get(typeId).keySet()) { + if(deletedPropertyName.equals(propertyName)) { + propertyWasPresent = true; + updated.unsetProperty(typeId, deletedPropertyName); + } + } + } + + // If no properties were removed, then do nothing. + if(!propertyWasPresent) { + return Optional.empty(); + } + } + + return Optional.of( updated.build() ); + }); + } catch (final EntityStorageException e) { + throw new IOException("Failed to update the Entity index.", e); + } + } + + @Override + public String getTableName() { + // Storage details have been abstracted away from the indexer. + return null; + } + + @Override + public void flush() throws IOException { + // We do not need to do anything to flush since we do not batch work. + } + + @Override + public void close() throws IOException { + // Nothing to close. + } + + @Override + public void dropGraph(RyaURI... graphs) { + // We do not support graphs when performing entity centric indexing. + } + + @Override + public Set<URI> getIndexablePredicates() { + // This isn't used anywhere in Rya, so it will not be implemented. + return null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/07643eb7/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityIndexer.java new file mode 100644 index 0000000..9ef5dfd --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityIndexer.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.entity.update; + +import org.apache.rya.indexing.entity.storage.EntityStorage; + +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.persist.index.RyaSecondaryIndexer; + +/** + * Updates the {@link Entity}s that are in a {@link EntityStorage} when new + * {@link RyaStatement}s are added/removed from the Rya instance. + */ +public interface EntityIndexer extends RyaSecondaryIndexer { + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/07643eb7/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java new file mode 100644 index 0000000..4bb003f --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.entity.update; + +import static java.util.Objects.requireNonNull; + +import java.util.Optional; +import java.util.function.Function; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.rya.indexing.entity.model.Entity; +import org.apache.rya.indexing.entity.storage.EntityStorage; +import org.apache.rya.indexing.entity.storage.EntityStorage.EntityAlreadyExistsException; +import org.apache.rya.indexing.entity.storage.EntityStorage.EntityStorageException; +import org.apache.rya.indexing.entity.storage.EntityStorage.StaleUpdateException; + +import mvm.rya.api.domain.RyaURI; + +/** + * Performs update operations over an {@link EntityStorage}. + */ +@ParametersAreNonnullByDefault +public class EntityUpdater { + + private final EntityStorage storage; + + /** + * Constructs an instance of {@link EntityUpdater}. + * + * @param storage - The storage this updater operates over. (not null) + */ + public EntityUpdater(EntityStorage storage) { + this.storage = requireNonNull(storage); + } + + /** + * Tries to updates the state of an {@link Entity} until the update succeeds + * or a non-recoverable exception is thrown. + * + * @param subject - The Subject of the {@link Entity} that will be updated. (not null) + * @param mutator - Performs the mutation on the old state of the Entity and returns + * the new state of the Entity. (not null) + * @throws EntityStorageException A non-recoverable error has caused the update to fail. + */ + public void update(RyaURI subject, EntityMutator mutator) throws EntityStorageException { + requireNonNull(subject); + requireNonNull(mutator); + + // Fetch the current state of the Entity. + boolean completed = false; + while(!completed) { + try { + final Optional<Entity> old = storage.get(subject); + final Optional<Entity> updated = mutator.apply(old); + + final boolean doWork = updated.isPresent(); + if(doWork) { + if(!old.isPresent()) { + storage.create(updated.get()); + } else { + storage.update(old.get(), updated.get()); + } + } + completed = true; + } catch(final EntityAlreadyExistsException | StaleUpdateException e) { + // These are recoverable exceptions. Try again. + } catch(final RuntimeException e) { + throw new EntityStorageException("Failed to update Entity with Subject '" + subject.getData() + "'.", e); + } + } + } + + /** + * Implementations of this interface are used to update the state of an + * {@link Entity} in unison with a {@link EntityUpdater}. + * </p> + * This table describes what the updater will do depending on if an Entity + * exists and if an updated Entity is returned. + * </p> + * <table border="1px"> + * <tr><th>Entity Provided</th><th>Update Returned</th><th>Effect</th></tr> + * <tr> + * <td>true</td> + * <td>true</td> + * <td>The old Entity will be updated using the returned state.</td> + * </tr> + * <tr> + * <td>true</td> + * <td>false</td> + * <td>No work is performed.</td> + * </tr> + * <tr> + * <td>false</td> + * <td>true</td> + * <td>A new Entity will be created using the returned state.</td> + * </tr> + * <tr> + * <td>false</td> + * <td>false</td> + * <td>No work is performed.</td> + * </tr> + * </table> + */ + public interface EntityMutator extends Function<Optional<Entity>, Optional<Entity>> { } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/07643eb7/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/mongo/MongoEntityIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/mongo/MongoEntityIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/mongo/MongoEntityIndexer.java new file mode 100644 index 0000000..71b5198 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/mongo/MongoEntityIndexer.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.entity.update.mongo; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.hadoop.conf.Configuration; +import org.apache.rya.indexing.entity.storage.EntityStorage; +import org.apache.rya.indexing.entity.storage.TypeStorage; +import org.apache.rya.indexing.entity.storage.mongo.MongoEntityStorage; +import org.apache.rya.indexing.entity.storage.mongo.MongoTypeStorage; +import org.apache.rya.indexing.entity.update.BaseEntityIndexer; +import org.apache.rya.indexing.entity.update.EntityIndexer; + +import com.mongodb.MongoClient; + +import mvm.rya.mongodb.MongoConnectorFactory; +import mvm.rya.mongodb.MongoDBRdfConfiguration; + +/** + * A Mongo DB implementation of {@link EntityIndexer}. + */ +@ParametersAreNonnullByDefault +public class MongoEntityIndexer extends BaseEntityIndexer { + + @Override + public EntityStorage getEntityStorage(Configuration conf) { + final MongoClient mongoClient = MongoConnectorFactory.getMongoClient(conf); + final String ryaInstanceName = new MongoDBRdfConfiguration(conf).getMongoDBName(); + return new MongoEntityStorage(mongoClient, ryaInstanceName); + } + + @Override + public TypeStorage getTypeStorage(Configuration conf) { + final MongoClient mongoClient = MongoConnectorFactory.getMongoClient(conf); + final String ryaInstanceName = new MongoDBRdfConfiguration(conf).getMongoDBName(); + return new MongoTypeStorage(mongoClient, ryaInstanceName); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/07643eb7/extras/indexing/src/test/java/mvm/rya/indexing/entity/query/EntityQueryNodeTest.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/mvm/rya/indexing/entity/query/EntityQueryNodeTest.java b/extras/indexing/src/test/java/mvm/rya/indexing/entity/query/EntityQueryNodeTest.java deleted file mode 100644 index ab263e0..0000000 --- a/extras/indexing/src/test/java/mvm/rya/indexing/entity/query/EntityQueryNodeTest.java +++ /dev/null @@ -1,194 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.indexing.entity.query; - -import static org.mockito.Mockito.mock; - -import java.util.List; - -import org.junit.Test; -import org.openrdf.model.vocabulary.RDF; -import org.openrdf.query.MalformedQueryException; -import org.openrdf.query.algebra.StatementPattern; -import org.openrdf.query.algebra.helpers.StatementPatternCollector; -import org.openrdf.query.parser.sparql.SPARQLParser; - -import com.google.common.collect.ImmutableSet; - -import mvm.rya.api.domain.RyaURI; -import mvm.rya.indexing.entity.model.Type; -import mvm.rya.indexing.entity.storage.EntityStorage; - -/** - * Unit tests the methods of {@link EntityQueryNode}. - */ -public class EntityQueryNodeTest { - - private static final Type PERSON_TYPE = - new Type(new RyaURI("urn:person"), - ImmutableSet.<RyaURI>builder() - .add(new RyaURI("urn:name")) - .add(new RyaURI("urn:age")) - .add(new RyaURI("urn:eye")) - .build()); - - private static final Type EMPLOYEE_TYPE = - new Type(new RyaURI("urn:employee"), - ImmutableSet.<RyaURI>builder() - .add(new RyaURI("urn:name")) - .add(new RyaURI("urn:hoursPerWeek")) - .build()); - - @Test(expected = IllegalStateException.class) - public void constructor_differentSubjects() throws Exception { - // A pattern that has two different subjects. - final List<StatementPattern> patterns = getSPs( - "SELECT * WHERE { " + - "<urn:SSN:111-11-1111> <" + RDF.TYPE + "> <urn:person> ."+ - "<urn:SSN:111-11-1111> <urn:age> ?age . " + - "<urn:SSN:111-11-1111> <urn:eye> ?eye . " + - "<urn:SSN:111-11-1111> <urn:name> ?name . " + - "<urn:SSN:222-22-2222> <urn:age> ?age . " + - "<urn:SSN:222-22-2222> <urn:eye> ?eye . " + - "<urn:SSN:222-22-2222> <urn:name> ?name . " + - "}"); - - - // This will fail. - new EntityQueryNode(PERSON_TYPE, patterns, mock(EntityStorage.class)); - } - - @Test(expected = IllegalStateException.class) - public void constructor_variablePredicate() throws Exception { - // A pattern that has a variable for its predicate. - final List<StatementPattern> patterns = getSPs( - "SELECT * WHERE { " + - "?subject <" + RDF.TYPE + "> <urn:person> ."+ - "?subject ?variableProperty ?value . " + - "?subject <urn:eye> ?eye . " + - "?subject <urn:name> ?name . " + - "}"); - - - // This will fail. - new EntityQueryNode(PERSON_TYPE, patterns, mock(EntityStorage.class)); - } - - @Test(expected = IllegalStateException.class) - public void constructor_predicateNotPartOfType() throws Exception { - // A pattern that does uses a predicate that is not part of the type. - final List<StatementPattern> patterns = getSPs( - "SELECT * WHERE { " + - "?subject <" + RDF.TYPE + "> <urn:person> ."+ - "?subject <urn:age> ?age . " + - "?subject <urn:eye> ?eye . " + - "?subject <urn:name> ?name . " + - "?subject <urn:notPartOfType> ?value . " + - "}"); - - // This will fail. - new EntityQueryNode(PERSON_TYPE, patterns, mock(EntityStorage.class)); - } - - @Test(expected = IllegalStateException.class) - public void constructor_typeMissing() throws Exception { - // A pattern that does uses a predicate that is not part of the type. - final List<StatementPattern> patterns = getSPs( - "SELECT * WHERE { " + - "?subject <urn:age> ?age . " + - "?subject <urn:eye> ?eye . " + - "?subject <urn:name> ?name . " + - "}"); - - // This will fail. - new EntityQueryNode(PERSON_TYPE, patterns, mock(EntityStorage.class)); - } - - @Test(expected = IllegalStateException.class) - public void constructor_wrongType() throws Exception { - // A pattern that does uses a predicate that is not part of the type. - final List<StatementPattern> patterns = getSPs( - "SELECT * WHERE { " + - "?subject <" + RDF.TYPE + "> <urn:person> ."+ - "?subject <urn:age> ?age . " + - "?subject <urn:eye> ?eye . " + - "?subject <urn:name> ?name . " + - "}"); - - // This will fail. - new EntityQueryNode(EMPLOYEE_TYPE, patterns, mock(EntityStorage.class)); - } - - // Happy path test. - - // TODO test for all of the types of preconditions - // test when a binding set can join - // test when a binding set does not join - // test when there are constants that are part of the query - // test when there are variables that are part of the query. - - @Test - public void evaluate_constantSubject() throws Exception { - // A set of patterns that match a sepecific Entity subject. - final List<StatementPattern> patterns = getSPs( - "SELECT * WHERE { " + - "<urn:SSN:111-11-1111> <" + RDF.TYPE + "> <urn:person> ."+ - "<urn:SSN:111-11-1111> <urn:age> ?age . " + - "<urn:SSN:111-11-1111> <urn:eye> ?eye . " + - "<urn:SSN:111-11-1111> <urn:name> ?name . " + - "}"); - - new EntityQueryNode(PERSON_TYPE, patterns, mock(EntityStorage.class)); - - - // TODO implement - } - - @Test - public void evaluate_variableSubject() throws Exception { - // A set of patterns that matches a variable Entity subject. - final List<StatementPattern> patterns = getSPs( - "SELECT * WHERE { " + - "?subject <" + RDF.TYPE + "> <urn:person> ."+ - "?subject <urn:age> ?age . " + - "?subject <urn:eye> ?eye . " + - "?subject <urn:name> ?name . " + - "}"); - - new EntityQueryNode(PERSON_TYPE, patterns, mock(EntityStorage.class)); - - - // TODO implement - } - - - - /** - * TODO doc - * - * @param sparql - * @return - * @throws MalformedQueryException - */ - private static List<StatementPattern> getSPs(String sparql) throws MalformedQueryException { - final StatementPatternCollector spCollector = new StatementPatternCollector(); - new SPARQLParser().parseQuery(sparql, null).getTupleExpr().visit(spCollector); - return spCollector.getStatementPatterns(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/07643eb7/extras/indexing/src/test/java/mvm/rya/indexing/entity/storage/mongo/EntityDocumentConverterTest.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/mvm/rya/indexing/entity/storage/mongo/EntityDocumentConverterTest.java b/extras/indexing/src/test/java/mvm/rya/indexing/entity/storage/mongo/EntityDocumentConverterTest.java deleted file mode 100644 index bbd3f06..0000000 --- a/extras/indexing/src/test/java/mvm/rya/indexing/entity/storage/mongo/EntityDocumentConverterTest.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.indexing.entity.storage.mongo; - -import static org.junit.Assert.assertEquals; - -import org.bson.Document; -import org.junit.Test; -import org.openrdf.model.vocabulary.XMLSchema; - -import mvm.rya.api.domain.RyaType; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.indexing.entity.model.Entity; -import mvm.rya.indexing.entity.model.Property; -import mvm.rya.indexing.entity.storage.mongo.DocumentConverter.DocumentConverterException; - -/** - * Tests the methods of {@link EntityDocumentConverter}. - */ -public class EntityDocumentConverterTest { - - @Test - public void to_and_from_document() throws DocumentConverterException { - // Convert an Entity into a Document. - final Entity entity = Entity.builder() - .setSubject(new RyaURI("urn:alice")) - // Add some explicily typed properties. - .setExplicitType(new RyaURI("urn:person")) - .setProperty(new RyaURI("urn:person"), new Property(new RyaURI("urn:age"), new RyaType(XMLSchema.INT, "30"))) - .setProperty(new RyaURI("urn:person"), new Property(new RyaURI("urn:eye"), new RyaType("blue"))) - // Add some implicitly typed properties. - .setProperty(new RyaURI("urn:employee"), new Property(new RyaURI("urn:hours"), new RyaType(XMLSchema.INT, "40"))) - .setProperty(new RyaURI("urn:employee"), new Property(new RyaURI("urn:employer"), new RyaType("Burger Joint"))) - .build(); - - final Document document = new EntityDocumentConverter().toDocument(entity); - - // Convert the Document back into an Entity. - final Entity converted = new EntityDocumentConverter().fromDocument(document); - - // Ensure the original matches the round trip converted Entity. - assertEquals(entity, converted); - } -} \ No newline at end of file
