http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/07643eb7/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/TypeStorage.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/TypeStorage.java
 
b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/TypeStorage.java
new file mode 100644
index 0000000..2d4622f
--- /dev/null
+++ 
b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/TypeStorage.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.entity.storage;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.rya.indexing.entity.EntityIndexException;
+import org.apache.rya.indexing.entity.model.Type;
+
+import com.google.common.base.Optional;
+
+import mvm.rya.api.domain.RyaURI;
+
+/**
+ * Stores and provides access to {@link Type}s.
+ */
+@ParametersAreNonnullByDefault
+public interface TypeStorage {
+
+    /**
+     * Creates a new {@link Type} within the storage. The new Type's ID must 
be unique.
+     *
+     * @param type - The {@link Type} to create. (not null)
+     * @throws TypeStorageException A problem occurred while creating the Type.
+     */
+    public void create(Type type) throws TypeStorageException;
+
+    /**
+     * Get a {@link Type} from the storage by its ID.
+     *
+     * @param typeId - The {@link Type}'s ID. (not null)
+     * @return The {@link Type} if one exists for the ID.
+     * @throws TypeStorageException A problem occurred while fetching from the 
storage.
+     */
+    public Optional<Type> get(RyaURI typeId) throws TypeStorageException;
+
+    /**
+     * Get all {@link Type}s that include a specific {@link Property} name.
+     *
+     * @param propertyName - The name to search for. (not null)
+     * @return All {@link Type}s that include {@code propertyName}.
+     * @throws TypeStorageException A problem occurred while searching for the 
Types
+     *   that have the Property name.
+     */
+    public CloseableIterator<Type> search(RyaURI propertyName) throws 
TypeStorageException;
+
+    /**
+     * Deletes a {@link Type} from the storage.
+     *
+     * @param typeId - The ID of the {@link Type} to delete. (not null)
+     * @return {@code true} if something was deleted; otherwise {@code false}.
+     * @throws TypeStorageException A problem occurred while deleting from the 
storage.
+     */
+    public boolean delete(RyaURI typeId) throws TypeStorageException;
+
+    /**
+     * A problem occurred while interacting with a {@link TypeStorage}.
+     */
+    public static class TypeStorageException extends EntityIndexException {
+        private static final long serialVersionUID = 1L;
+
+        /**
+         * Constructs a new exception with the specified detail message.  The
+         * cause is not initialized, and may subsequently be initialized by
+         * a call to {@link #initCause}.
+         *
+         * @param   message   the detail message. The detail message is saved 
for
+         *          later retrieval by the {@link #getMessage()} method.
+         */
+        public TypeStorageException(String message) {
+            super(message);
+        }
+
+        /**
+         * Constructs a new exception with the specified detail message and
+         * cause.  <p>Note that the detail message associated with
+         * {@code cause} is <i>not</i> automatically incorporated in
+         * this exception's detail message.
+         *
+         * @param  message the detail message (which is saved for later 
retrieval
+         *         by the {@link #getMessage()} method).
+         * @param  cause the cause (which is saved for later retrieval by the
+         *         {@link #getCause()} method).  (A <tt>null</tt> value is
+         *         permitted, and indicates that the cause is nonexistent or
+         *         unknown.)
+         */
+        public TypeStorageException(String message, Throwable cause) {
+            super(message, cause);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/07643eb7/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/ConvertingCursor.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/ConvertingCursor.java
 
b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/ConvertingCursor.java
new file mode 100644
index 0000000..31a9308
--- /dev/null
+++ 
b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/ConvertingCursor.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.entity.storage.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.util.function.Function;
+
+import org.apache.rya.indexing.entity.storage.CloseableIterator;
+import org.bson.Document;
+
+import com.mongodb.client.MongoCursor;
+
+/**
+ * Converts the {@link Document}s that are returned by a {@link MongoCursor}
+ * using a {@link DocumentConverter} every time {@link #next()} is invoked.
+ *
+ * @param <T> - The type of object the Documents are converted into.
+ */
+public class ConvertingCursor<T> implements CloseableIterator<T> {
+
+    private final Converter<T> converter;
+    private final MongoCursor<Document> cursor;
+
+    /**
+     * Constructs an instance of {@link ConvertingCursor}.
+     *
+     * @param converter - Converts the {@link Document}s returned by {@code 
cursor}. (not null)
+     * @param cursor - Retrieves the {@link Document}s from a Mongo DB 
instance. (not null)
+     */
+    public ConvertingCursor(Converter<T> converter, MongoCursor<Document> 
cursor) {
+        this.converter = requireNonNull(converter);
+        this.cursor = requireNonNull(cursor);
+    }
+
+    @Override
+    public boolean hasNext() {
+        return cursor.hasNext();
+    }
+
+    @Override
+    public T next() {
+        return converter.apply( cursor.next() );
+    }
+
+    @Override
+    public void close() throws IOException {
+        cursor.close();
+    }
+
+    /**
+     * Converts a {@link Document} into some other object.
+     *
+     * @param <R> The type of object the Document is converted into.
+     */
+    @FunctionalInterface
+    public static interface Converter<R> extends Function<Document, R> { }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/07643eb7/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/DocumentConverter.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/DocumentConverter.java
 
b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/DocumentConverter.java
new file mode 100644
index 0000000..da3e8b6
--- /dev/null
+++ 
b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/DocumentConverter.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.entity.storage.mongo;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.bson.Document;
+
+/**
+ * Converts an object to/from a {@link Document}.
+ *
+ * @param <T> - The type of object that is converted to/from a {@link 
Document}.
+ */
+@ParametersAreNonnullByDefault
+public interface DocumentConverter<T> {
+
+    /**
+     * Converts an object into a {@link Document}.
+     *
+     * @param object - The object to convert. (not null)
+     * @return A {@link Document} representing the object.
+     * @throws DocumentConverterException A problem occurred while converting 
the object.
+     */
+    public Document toDocument(T object) throws DocumentConverterException;
+
+    /**
+     * Converts a {@link Document} into the target object.
+     *
+     * @param document - The document to convert. (not null)
+     * @return The target object representation of the document.
+     * @throws DocumentConverterException A problem occurred while converting 
the {@link Document}.
+     */
+    public T fromDocument(Document document) throws DocumentConverterException;
+
+    /**
+     * A problem occurred while converting an object while using a {@link 
DocumentConverter}.
+     */
+    public static class DocumentConverterException extends Exception {
+        private static final long serialVersionUID = 1L;
+
+        public DocumentConverterException(String message) {
+            super(message);
+        }
+
+        public DocumentConverterException(String message, Throwable cause) {
+            super(message, cause);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/07643eb7/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/EntityDocumentConverter.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/EntityDocumentConverter.java
 
b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/EntityDocumentConverter.java
new file mode 100644
index 0000000..6a2fb74
--- /dev/null
+++ 
b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/EntityDocumentConverter.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.entity.storage.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.rya.indexing.entity.model.Entity;
+import org.apache.rya.indexing.entity.model.Property;
+import org.bson.Document;
+
+import mvm.rya.api.domain.RyaType;
+import mvm.rya.api.domain.RyaURI;
+
+/**
+ * Converts between {@link Entity} and {@link Document}.
+ */
+@ParametersAreNonnullByDefault
+public class EntityDocumentConverter implements DocumentConverter<Entity> {
+
+    public static final String SUBJECT = "_id";
+    public static final String EXPLICIT_TYPE_IDS = "explicitTypeIds";
+    public static final String PROPERTIES = "properties";
+    public static final String VERSION = "version";
+
+    private final RyaTypeDocumentConverter ryaTypeConverter = new 
RyaTypeDocumentConverter();
+
+    @Override
+    public Document toDocument(Entity entity) {
+        requireNonNull(entity);
+
+        final Document doc = new Document();
+        doc.append(SUBJECT, entity.getSubject().getData());
+
+        doc.append(EXPLICIT_TYPE_IDS, entity.getExplicitTypeIds().stream()
+                .map(explicitTypeId -> explicitTypeId.getData())
+                .collect(Collectors.toList()));
+
+        final Document propertiesDoc = new Document();
+        for(final RyaURI typeId : entity.getProperties().keySet()) {
+            final Document typePropertiesDoc = new Document();
+            entity.getProperties().get(typeId)
+                .forEach((propertyNameUri, property) -> {
+                    final String propertyName = property.getName().getData();
+                    final RyaType value = property.getValue();
+                    typePropertiesDoc.append(propertyName,  
ryaTypeConverter.toDocument(value));
+                });
+            propertiesDoc.append(typeId.getData(), typePropertiesDoc);
+        }
+        doc.append(PROPERTIES, propertiesDoc);
+
+        doc.append(VERSION, entity.getVersion());
+
+        return doc;
+    }
+
+    @Override
+    public Entity fromDocument(Document document) throws 
DocumentConverterException {
+        requireNonNull(document);
+
+        // Preconditions.
+        if(!document.containsKey(SUBJECT)) {
+            throw new DocumentConverterException("Could not convert document 
'" + document +
+                    "' because its '" + SUBJECT + "' field is missing.");
+        }
+
+        if(!document.containsKey(EXPLICIT_TYPE_IDS)) {
+            throw new DocumentConverterException("Could not convert document 
'" + document +
+                    "' because its '" + EXPLICIT_TYPE_IDS + "' field is 
missing.");
+        }
+
+        if(!document.containsKey(PROPERTIES)) {
+            throw new DocumentConverterException("Could not convert document 
'" + document +
+                    "' because its '" + PROPERTIES + "' field is missing.");
+        }
+
+        if(!document.containsKey(VERSION)) {
+            throw new DocumentConverterException("Could not convert document 
'" + document +
+                    "' because its '" + VERSION + "' field is missing.");
+        }
+
+        // Perform the conversion.
+        final Entity.Builder builder = Entity.builder()
+                .setSubject( new RyaURI(document.getString(SUBJECT)) );
+
+        ((List<String>)document.get(EXPLICIT_TYPE_IDS)).stream()
+            .forEach(explicitTypeId -> builder.setExplicitType(new 
RyaURI(explicitTypeId)));
+
+        final Document propertiesDoc = (Document) document.get(PROPERTIES);
+        for(final String typeId : propertiesDoc.keySet()) {
+            final Document typePropertiesDoc = (Document) 
propertiesDoc.get(typeId);
+            for(final String propertyName : typePropertiesDoc.keySet()) {
+                final Document value = (Document) 
typePropertiesDoc.get(propertyName);
+                final RyaType propertyValue = ryaTypeConverter.fromDocument( 
value );
+                builder.setProperty(new RyaURI(typeId), new Property(new 
RyaURI(propertyName), propertyValue));
+            }
+        }
+
+        builder.setVersion( document.getInteger(VERSION) );
+
+        return builder.build();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/07643eb7/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorage.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorage.java
 
b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorage.java
new file mode 100644
index 0000000..8a5f6da
--- /dev/null
+++ 
b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorage.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.entity.storage.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.rya.indexing.entity.model.Entity;
+import org.apache.rya.indexing.entity.model.Property;
+import org.apache.rya.indexing.entity.model.Type;
+import org.apache.rya.indexing.entity.model.TypedEntity;
+import org.apache.rya.indexing.entity.storage.CloseableIterator;
+import org.apache.rya.indexing.entity.storage.EntityStorage;
+import org.apache.rya.indexing.entity.storage.mongo.ConvertingCursor.Converter;
+import 
org.apache.rya.indexing.entity.storage.mongo.DocumentConverter.DocumentConverterException;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import com.google.common.base.Joiner;
+import com.mongodb.ErrorCategory;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoException;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.model.Filters;
+
+import mvm.rya.api.domain.RyaURI;
+
+/**
+ * A Mongo DB implementation of {@link EntityStorage}.
+ */
+@ParametersAreNonnullByDefault
+public class MongoEntityStorage implements EntityStorage {
+
+    private static final String COLLECTION_NAME = "entity-entities";
+
+    private static final EntityDocumentConverter ENTITY_CONVERTER = new 
EntityDocumentConverter();
+
+    /**
+     * A client connected to the Mongo instance that hosts the Rya instance.
+     */
+    private final MongoClient mongo;
+
+    /**
+     * The name of the Rya instance the {@link TypedEntity}s are for.
+     */
+    private final String ryaInstanceName;
+
+    /**
+     * Constructs an instance of {@link MongoEntityStorage}.
+     *
+     * @param mongo - A client connected to the Mongo instance that hosts the 
Rya instance. (not null)
+     * @param ryaInstanceName - The name of the Rya instance the {@link 
TypedEntity}s are for. (not null)
+     */
+    public MongoEntityStorage(MongoClient mongo, String ryaInstanceName) {
+        this.mongo = requireNonNull(mongo);
+        this.ryaInstanceName = requireNonNull(ryaInstanceName);
+    }
+
+    @Override
+    public void create(Entity entity) throws EntityStorageException {
+        requireNonNull(entity);
+
+        try {
+            mongo.getDatabase(ryaInstanceName)
+                .getCollection(COLLECTION_NAME)
+                .insertOne( ENTITY_CONVERTER.toDocument(entity) );
+
+        } catch(final MongoException e) {
+            final ErrorCategory category = ErrorCategory.fromErrorCode( 
e.getCode() );
+            if(category == ErrorCategory.DUPLICATE_KEY) {
+                throw new EntityAlreadyExistsException("Failed to create 
Entity with Subject '" + entity.getSubject().getData() + "'.", e);
+            }
+            throw new EntityStorageException("Failed to create Entity with 
Subject '" + entity.getSubject().getData() + "'.", e);
+        }
+    }
+
+    @Override
+    public void update(Entity old, Entity updated) throws 
StaleUpdateException, EntityStorageException {
+        requireNonNull(old);
+        requireNonNull(updated);
+
+        // The updated entity must have the same Subject as the one it is 
replacing.
+        if(!old.getSubject().equals(updated.getSubject())) {
+            throw new EntityStorageException("The old Entity and the updated 
Entity must have the same Subject. " +
+                    "Old Subject: " + old.getSubject().getData() + ", Updated 
Subject: " + updated.getSubject().getData());
+        }
+
+        // Make sure the updated Entity has a higher verison.
+        if(old.getVersion() >= updated.getVersion()) {
+            throw new EntityStorageException("The old Entity's version must be 
less than the updated Entity's version." +
+                    " Old version: " + old.getVersion() + " Updated version: " 
+ updated.getVersion());
+        }
+
+        final Set<Bson> filters = new HashSet<>();
+
+        // Must match the old entity's Subject.
+        filters.add( makeSubjectFilter(old.getSubject()) );
+
+        // Must match the old entity's Version.
+        filters.add( makeVersionFilter(old.getVersion()) );
+
+        // Do a find and replace.
+        final Bson oldEntityFilter = Filters.and(filters);
+        final Document updatedDoc = ENTITY_CONVERTER.toDocument(updated);
+
+        final MongoCollection<Document> collection = 
mongo.getDatabase(ryaInstanceName).getCollection(COLLECTION_NAME);
+        if(collection.findOneAndReplace(oldEntityFilter, updatedDoc) == null) {
+            throw new StaleUpdateException("Could not update the Entity with 
Subject '" + updated.getSubject().getData() + ".");
+        }
+    }
+
+    @Override
+    public Optional<Entity> get(RyaURI subject) throws EntityStorageException {
+        requireNonNull(subject);
+
+        try {
+            final Document document = mongo.getDatabase(ryaInstanceName)
+                .getCollection(COLLECTION_NAME)
+                .find( Filters.eq(EntityDocumentConverter.SUBJECT, 
subject.getData()) )
+                .first();
+
+            return document == null ?
+                    Optional.empty() :
+                    Optional.of( ENTITY_CONVERTER.fromDocument(document) );
+
+        } catch(final MongoException | DocumentConverterException e) {
+            throw new EntityStorageException("Could not get the Entity with 
Subject '" + subject.getData() + "'.", e);
+        }
+    }
+
+    @Override
+    public CloseableIterator<TypedEntity> search(final Type type, final 
Set<Property> properties) throws EntityStorageException {
+        requireNonNull(type);
+        requireNonNull(properties);
+
+        try {
+            // Match the specified Property values.
+            final Set<Bson> filters = properties.stream()
+                    .flatMap(property -> makePropertyFilters(type.getId(), 
property))
+                    .collect(Collectors.toSet());
+
+            // Only match explicitly Typed entities.
+            filters.add( makeExplicitTypeFilter(type.getId()) );
+
+            // Get a cursor over the Mongo Document that represent the search 
results.
+            final MongoCursor<Document> cursor = 
mongo.getDatabase(ryaInstanceName)
+                .getCollection(COLLECTION_NAME)
+                .find(Filters.and(filters))
+                .iterator();
+
+            // Define that Converter that converts from Document into 
TypedEntity.
+            final Converter<TypedEntity> converter = document -> {
+                try {
+                    final Entity entity = 
ENTITY_CONVERTER.fromDocument(document);
+                    final Optional<TypedEntity> typedEntity = 
entity.makeTypedEntity( type.getId() );
+                    if(!typedEntity.isPresent()) {
+                        throw new RuntimeException("Entity with Subject '" + 
entity.getSubject() +
+                                "' could not be cast into Type '" + 
type.getId() + "'.");
+                    }
+                    return typedEntity.get();
+
+                } catch (final DocumentConverterException e) {
+                    throw new RuntimeException("Document '" + document + "' 
could not be parsed into an Entity.", e);
+                }
+            };
+
+            // Return a cursor that performs the conversion.
+            return new ConvertingCursor<TypedEntity>(converter, cursor);
+
+        } catch(final MongoException e) {
+            throw new EntityStorageException("Could not search Entity.", e);
+        }
+    }
+
+    @Override
+    public boolean delete(RyaURI subject) throws EntityStorageException {
+        requireNonNull(subject);
+
+        try {
+            final Document deleted = mongo.getDatabase(ryaInstanceName)
+                .getCollection(COLLECTION_NAME)
+                .findOneAndDelete( makeSubjectFilter(subject) );
+
+            return deleted != null;
+
+        } catch(final MongoException e) {
+            throw new EntityStorageException("Could not delete the Entity with 
Subject '" + subject.getData() + "'.", e);
+        }
+    }
+
+    private static Bson makeSubjectFilter(RyaURI subject) {
+        return Filters.eq(EntityDocumentConverter.SUBJECT, subject.getData());
+    }
+
+    private static Bson makeVersionFilter(int version) {
+        return Filters.eq(EntityDocumentConverter.VERSION, version);
+    }
+
+    private static Bson makeExplicitTypeFilter(RyaURI typeId) {
+        return Filters.eq(EntityDocumentConverter.EXPLICIT_TYPE_IDS, 
typeId.getData());
+    }
+
+    private static Stream<Bson> makePropertyFilters(RyaURI typeId, Property 
property) {
+        final String propertyName = property.getName().getData();
+
+        // Must match the property's data type.
+        final String dataTypePath = Joiner.on(".").join(
+                new String[]{EntityDocumentConverter.PROPERTIES, 
typeId.getData(), propertyName, RyaTypeDocumentConverter.DATA_TYPE});
+        final String propertyDataType = 
property.getValue().getDataType().stringValue();
+        final Bson dataTypeFilter = Filters.eq(dataTypePath, propertyDataType);
+
+        // Must match the property's value.
+        final String valuePath = Joiner.on(".").join(
+                new String[]{EntityDocumentConverter.PROPERTIES, 
typeId.getData(), propertyName, RyaTypeDocumentConverter.VALUE});
+        final String propertyValue = property.getValue().getData();
+        final Bson valueFilter = Filters.eq(valuePath, propertyValue);
+
+        return Stream.of(dataTypeFilter, valueFilter);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/07643eb7/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/MongoTypeStorage.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/MongoTypeStorage.java
 
b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/MongoTypeStorage.java
new file mode 100644
index 0000000..88a0aa1
--- /dev/null
+++ 
b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/MongoTypeStorage.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.entity.storage.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.rya.indexing.entity.model.Type;
+import org.apache.rya.indexing.entity.storage.CloseableIterator;
+import org.apache.rya.indexing.entity.storage.TypeStorage;
+import 
org.apache.rya.indexing.entity.storage.mongo.DocumentConverter.DocumentConverterException;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import com.google.common.base.Optional;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoException;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.model.Filters;
+
+import mvm.rya.api.domain.RyaURI;
+
+/**
+ * A Mongo DB implementation of {@link TypeStorage}.
+ */
+@ParametersAreNonnullByDefault
+public class MongoTypeStorage implements TypeStorage {
+
+    private static final String COLLECTION_NAME = "entity-types";
+
+    private static final TypeDocumentConverter TYPE_CONVERTER = new 
TypeDocumentConverter();
+
+    /**
+     * A client connected to the Mongo instance that hosts the Rya instance.
+     */
+    private final MongoClient mongo;
+
+    /**
+     * The name of the Rya instance the {@link Type}s are for.
+     */
+    private final String ryaInstanceName;
+
+    /**
+     * Constructs an instance of {@link MongoTypeStorage}.
+     *
+     * @param mongo - A client connected to the Mongo instance that hosts the 
Rya instance. (not null)
+     * @param ryaInstanceName - The name of the Rya instance the {@link Type}s 
are for. (not null)
+     */
+    public MongoTypeStorage(MongoClient mongo, String ryaInstanceName) {
+        this.mongo = requireNonNull(mongo);
+        this.ryaInstanceName = requireNonNull(ryaInstanceName);
+    }
+
+    @Override
+    public void create(Type type) throws TypeStorageException {
+        requireNonNull(type);
+
+        try {
+            mongo.getDatabase(ryaInstanceName)
+                .getCollection(COLLECTION_NAME)
+                .insertOne(TYPE_CONVERTER.toDocument(type));
+
+        } catch(final MongoException e) {
+            throw new TypeStorageException("Failed to create Type with ID '" + 
type.getId().getData() + "'.", e);
+        }
+    }
+
+    @Override
+    public Optional<Type> get(RyaURI typeId) throws TypeStorageException {
+        requireNonNull(typeId);
+
+        try {
+            final Document document = mongo.getDatabase(ryaInstanceName)
+                .getCollection(COLLECTION_NAME)
+                .find( makeIdFilter(typeId) )
+                .first();
+
+            return document == null ?
+                    Optional.absent() :
+                    Optional.of( TYPE_CONVERTER.fromDocument(document) );
+
+        } catch(final MongoException | DocumentConverterException e) {
+            throw new TypeStorageException("Could not get the Type with ID '" 
+ typeId.getData() + "'.", e);
+        }
+    }
+
+    @Override
+    public CloseableIterator<Type> search(RyaURI propertyName) throws 
TypeStorageException {
+        requireNonNull(propertyName);
+
+        try {
+            // Create a Filter that finds Types who have the provided property 
names.
+            final Bson byPropertyName = 
Filters.eq(TypeDocumentConverter.PROPERTY_NAMES, propertyName.getData());
+
+            final MongoCursor<Document> cursor = 
mongo.getDatabase(ryaInstanceName)
+                .getCollection(COLLECTION_NAME)
+                .find( byPropertyName )
+                .iterator();
+
+            return new ConvertingCursor<Type>(document -> {
+                try {
+                    return TYPE_CONVERTER.fromDocument(document);
+                } catch (final Exception e) {
+                    throw new RuntimeException("Could not convert the Document 
'" + document + "' into a Type.", e);
+                }
+            }, cursor);
+
+        } catch(final MongoException e) {
+            throw new TypeStorageException("Could not fetch Types that include 
the property '" + propertyName.getData() + "'.", e);
+        }
+    }
+
+    @Override
+    public boolean delete(RyaURI typeId) throws TypeStorageException {
+        requireNonNull(typeId);
+
+        try {
+            final Document deleted = mongo.getDatabase(ryaInstanceName)
+                .getCollection(COLLECTION_NAME)
+                .findOneAndDelete( makeIdFilter(typeId) );
+
+            return deleted != null;
+
+        } catch(final MongoException e) {
+            throw new TypeStorageException("Could not delete the Type with ID 
'" + typeId.getData() + "'.", e);
+        }
+    }
+
+    private static Bson makeIdFilter(RyaURI typeId) {
+        return Filters.eq(TypeDocumentConverter.ID, typeId.getData());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/07643eb7/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/RyaTypeDocumentConverter.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/RyaTypeDocumentConverter.java
 
b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/RyaTypeDocumentConverter.java
new file mode 100644
index 0000000..f7fd4ac
--- /dev/null
+++ 
b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/RyaTypeDocumentConverter.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.entity.storage.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.bson.Document;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+
+import mvm.rya.api.domain.RyaType;
+
+/**
+ * Converts between {@link RyaType} and {@link Document}.
+ */
+@ParametersAreNonnullByDefault
+public class RyaTypeDocumentConverter implements DocumentConverter<RyaType> {
+
+    private static final ValueFactory VF = new ValueFactoryImpl();
+
+    public static final String DATA_TYPE = "dataType";
+    public static final String VALUE = "value";
+
+    @Override
+    public Document toDocument(RyaType ryaType) {
+        requireNonNull(ryaType);
+
+        return new Document()
+                .append(DATA_TYPE, ryaType.getDataType().toString())
+                .append(VALUE, ryaType.getData());
+    }
+
+    @Override
+    public RyaType fromDocument(Document document) throws 
DocumentConverterException {
+        requireNonNull(document);
+
+        if(!document.containsKey(DATA_TYPE)) {
+            throw new DocumentConverterException("Could not convert document 
'" + document +
+                    "' because its '" + DATA_TYPE + "' field is missing.");
+        }
+
+        if(!document.containsKey(VALUE)) {
+            throw new DocumentConverterException("Could not convert document 
'" + document +
+                    "' because its '" + VALUE + "' field is missing.");
+        }
+
+        return new RyaType(
+                VF.createURI( document.getString(DATA_TYPE) ),
+                document.getString(VALUE));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/07643eb7/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/TypeDocumentConverter.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/TypeDocumentConverter.java
 
b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/TypeDocumentConverter.java
new file mode 100644
index 0000000..104fbad
--- /dev/null
+++ 
b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/TypeDocumentConverter.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.entity.storage.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.rya.indexing.entity.model.Type;
+import org.bson.Document;
+
+import com.google.common.collect.ImmutableSet;
+
+import mvm.rya.api.domain.RyaURI;
+
+/**
+ * Converts between {@link Type} and {@link Document}.
+ */
+@ParametersAreNonnullByDefault
+public class TypeDocumentConverter implements DocumentConverter<Type> {
+
+    public static final String ID = "_id";
+    public static final String PROPERTY_NAMES = "propertyNames";
+
+    @Override
+    public Document toDocument(final Type type) {
+        requireNonNull(type);
+
+        final Document doc = new Document();
+        doc.append(ID, type.getId().getData());
+
+        final List<String> propertyNames = new ArrayList<>();
+        type.getPropertyNames().forEach(field -> 
propertyNames.add(field.getData()));
+        doc.append(PROPERTY_NAMES, propertyNames);
+
+        return doc;
+    }
+
+    @Override
+    public Type fromDocument(final Document document) throws 
DocumentConverterException {
+        requireNonNull(document);
+
+        if(!document.containsKey(ID)) {
+            throw new DocumentConverterException("Could not convert document 
'" + document +
+                    "' because its '" + ID + "' field is missing.");
+        }
+
+        if(!document.containsKey(PROPERTY_NAMES)) {
+            throw new DocumentConverterException("Could not convert document 
'" + document +
+                    "' because its '" + PROPERTY_NAMES + "' field is 
missing.");
+        }
+
+        final RyaURI typeId = new RyaURI( document.getString(ID) );
+
+        final ImmutableSet.Builder<RyaURI> propertyNames = 
ImmutableSet.builder();
+        ((List<String>) document.get(PROPERTY_NAMES))
+            .forEach(propertyName -> propertyNames.add(new 
RyaURI(propertyName)));
+
+        return new Type(typeId, propertyNames.build());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/07643eb7/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/BaseEntityIndexer.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/BaseEntityIndexer.java
 
b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/BaseEntityIndexer.java
new file mode 100644
index 0000000..0bd9261
--- /dev/null
+++ 
b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/BaseEntityIndexer.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.entity.update;
+
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Collections.singleton;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.groupingBy;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.Nullable;
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.indexing.entity.model.Entity;
+import org.apache.rya.indexing.entity.model.Property;
+import org.apache.rya.indexing.entity.model.Type;
+import org.apache.rya.indexing.entity.storage.CloseableIterator;
+import org.apache.rya.indexing.entity.storage.EntityStorage;
+import org.apache.rya.indexing.entity.storage.TypeStorage;
+import 
org.apache.rya.indexing.entity.storage.EntityStorage.EntityStorageException;
+import org.apache.rya.indexing.entity.storage.TypeStorage.TypeStorageException;
+import org.openrdf.model.URI;
+import org.openrdf.model.vocabulary.RDF;
+
+import com.google.common.base.Objects;
+
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaType;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.mongodb.MongoDBRdfConfiguration;
+
+/**
+ * A base class that may be used to update an {@link EntityStorage} as new
+ * {@link RyaStatement}s are added to/removed from the Rya instance.
+ */
+@ParametersAreNonnullByDefault
+public abstract class BaseEntityIndexer implements EntityIndexer {
+
+    /**
+     * When this URI is the Predicate of a Statement, it indicates a {@link 
Type} for an {@link Entity}.
+     */
+    private static final RyaURI TYPE_URI = new RyaURI( RDF.TYPE.toString() );
+
+    private final AtomicReference<MongoDBRdfConfiguration> configuration = new 
AtomicReference<>();
+    private final AtomicReference<EntityStorage> entities = new 
AtomicReference<>();
+    private final AtomicReference<TypeStorage> types = new AtomicReference<>();
+
+    /**
+     * Creates the {@link EntityStorage} that will be used by the indexer.
+     *
+     * @param conf - Indicates how the {@link EntityStorage} is initialized. 
(not null)
+     * @return The {@link EntityStorage} that will be used by this indexer.
+     */
+    public abstract @Nullable EntityStorage getEntityStorage(Configuration 
conf);
+
+    /**
+     * Creates the {@link TypeStorage} that will be used by the indexer.
+     *
+     * @param conf - Indicates how the {@link TypeStorage} is initialized. 
(not null)
+     * @return The {@link TypeStorage} that will be used by this indexer.
+     */
+    public abstract @Nullable TypeStorage getTypeStorage(Configuration conf);
+
+    @Override
+    public void setConf(Configuration conf) {
+        requireNonNull(conf);
+        entities.set( getEntityStorage(conf) );
+        types.set( getTypeStorage(conf) );
+    }
+
+    @Override
+    public Configuration getConf() {
+        return configuration.get();
+    }
+
+    @Override
+    public void storeStatement(RyaStatement statement) throws IOException {
+        requireNonNull(statement);
+        storeStatements( singleton(statement) );
+    }
+
+    @Override
+    public void storeStatements(Collection<RyaStatement> statements) throws 
IOException {
+        requireNonNull(statements);
+
+        final Map<RyaURI,List<RyaStatement>> groupedBySubject = 
statements.stream()
+            .collect(groupingBy(RyaStatement::getSubject));
+
+        for(final Entry<RyaURI, List<RyaStatement>> entry : 
groupedBySubject.entrySet()) {
+            try {
+                updateEntity(entry.getKey(), entry.getValue());
+            } catch (final EntityStorageException e) {
+                throw new IOException("Failed to update the Entity index.", e);
+            }
+        }
+    }
+
+    /**
+     * Updates a {@link Entity} to reflect new {@link RyaStatement}s.
+     *
+     * @param subject - The Subject of the {@link Entity} the statements are 
for. (not null)
+     * @param statements - Statements that the {@link Entity} will be updated 
with. (not null)
+     */
+    private void updateEntity(final RyaURI subject, final 
Collection<RyaStatement> statements) throws EntityStorageException {
+        requireNonNull(subject);
+        requireNonNull(statements);
+
+        final EntityStorage entities = this.entities.get();
+        final TypeStorage types = this.types.get();
+        checkState(entities != null, "Must set this indexers configuration 
before storing statements.");
+        checkState(types != null, "Must set this indexers configuration before 
storing statements.");
+
+        new EntityUpdater(entities).update(subject, old -> {
+            // Create a builder with the updated Version.
+            final Entity.Builder updated;
+            if(!old.isPresent()) {
+                updated = Entity.builder()
+                        .setSubject(subject)
+                        .setVersion(0);
+            } else {
+                final int updatedVersion = old.get().getVersion() + 1;
+                updated = Entity.builder(old.get())
+                        .setVersion( updatedVersion );
+            }
+
+            // Update the entity based on the Statements.
+            for(final RyaStatement statement : statements) {
+
+                // The Statement is setting an Explicit Type ID for the Entity.
+                if(Objects.equal(TYPE_URI, statement.getPredicate())) {
+                    final RyaURI typeId = new 
RyaURI(statement.getObject().getData());
+                    updated.setExplicitType(typeId);
+                }
+
+                // The Statement is adding a Property to the Entity.
+                else {
+                    final RyaURI propertyName = statement.getPredicate();
+                    final RyaType propertyValue = statement.getObject();
+
+                    try(final CloseableIterator<Type> typesIt = 
types.search(propertyName)) {
+                        // Set the Property for each type that includes the 
Statement's predicate.
+                        while(typesIt.hasNext()) {
+                            final RyaURI typeId = typesIt.next().getId();
+                            updated.setProperty(typeId, new 
Property(propertyName, propertyValue));
+                        }
+                    } catch (final TypeStorageException | IOException e) {
+                        throw new RuntimeException("Failed to fetch Types that 
include the property name '" +
+                                statement.getPredicate().getData() + "'.", e);
+                    }
+                }
+            }
+
+            return Optional.of( updated.build() );
+        });
+    }
+
+    @Override
+    public void deleteStatement(RyaStatement statement) throws IOException {
+        requireNonNull(statement);
+
+        final EntityStorage entities = this.entities.get();
+        final TypeStorage types = this.types.get();
+        checkState(entities != null, "Must set this indexers configuration 
before storing statements.");
+        checkState(types != null, "Must set this indexers configuration before 
storing statements.");
+
+        try {
+            new EntityUpdater(entities).update(statement.getSubject(), old -> {
+                // If there is no Entity for the subject of the statement, 
then do nothing.
+                if(!old.isPresent()) {
+                    return Optional.empty();
+                }
+
+                final Entity oldEntity = old.get();
+
+                // Increment the version of the Entity.
+                final Entity.Builder updated = Entity.builder(oldEntity);
+                updated.setVersion(oldEntity.getVersion() + 1);
+
+                if(TYPE_URI.equals(statement.getPredicate())) {
+                    // If the Type ID already isn't in the list of explicit 
types, then do nothing.
+                    final RyaURI typeId = new RyaURI( 
statement.getObject().getData() );
+                    if(!oldEntity.getExplicitTypeIds().contains(typeId)) {
+                        return Optional.empty();
+                    }
+
+                    // Otherwise remove it from the list.
+                    updated.unsetExplicitType(typeId);
+                } else {
+                    // If the deleted property appears within the old entity's 
properties, then remove it.
+                    final RyaURI deletedPropertyName = 
statement.getPredicate();
+
+                    boolean propertyWasPresent = false;
+                    for(final RyaURI typeId : 
oldEntity.getProperties().keySet()) {
+                        for(final RyaURI propertyName : 
oldEntity.getProperties().get(typeId).keySet()) {
+                            if(deletedPropertyName.equals(propertyName)) {
+                                propertyWasPresent = true;
+                                updated.unsetProperty(typeId, 
deletedPropertyName);
+                            }
+                        }
+                    }
+
+                    // If no properties were removed, then do nothing.
+                    if(!propertyWasPresent) {
+                        return Optional.empty();
+                    }
+                }
+
+                return Optional.of( updated.build() );
+            });
+        } catch (final EntityStorageException e) {
+            throw new IOException("Failed to update the Entity index.", e);
+        }
+    }
+
+    @Override
+    public String getTableName() {
+        // Storage details have been abstracted away from the indexer.
+        return null;
+    }
+
+    @Override
+    public void flush() throws IOException {
+        // We do not need to do anything to flush since we do not batch work.
+    }
+
+    @Override
+    public void close() throws IOException {
+        // Nothing to close.
+    }
+
+    @Override
+    public void dropGraph(RyaURI... graphs) {
+        // We do not support graphs when performing entity centric indexing.
+    }
+
+    @Override
+    public Set<URI> getIndexablePredicates() {
+        // This isn't used anywhere in Rya, so it will not be implemented.
+        return null;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/07643eb7/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityIndexer.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityIndexer.java
 
b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityIndexer.java
new file mode 100644
index 0000000..9ef5dfd
--- /dev/null
+++ 
b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityIndexer.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.entity.update;
+
+import org.apache.rya.indexing.entity.storage.EntityStorage;
+
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.persist.index.RyaSecondaryIndexer;
+
+/**
+ * Updates the {@link Entity}s that are in a {@link EntityStorage} when new
+ * {@link RyaStatement}s are added/removed from the Rya instance.
+ */
+public interface EntityIndexer extends RyaSecondaryIndexer {
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/07643eb7/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java
 
b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java
new file mode 100644
index 0000000..4bb003f
--- /dev/null
+++ 
b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.entity.update;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.rya.indexing.entity.model.Entity;
+import org.apache.rya.indexing.entity.storage.EntityStorage;
+import 
org.apache.rya.indexing.entity.storage.EntityStorage.EntityAlreadyExistsException;
+import 
org.apache.rya.indexing.entity.storage.EntityStorage.EntityStorageException;
+import 
org.apache.rya.indexing.entity.storage.EntityStorage.StaleUpdateException;
+
+import mvm.rya.api.domain.RyaURI;
+
+/**
+ * Performs update operations over an {@link EntityStorage}.
+ */
+@ParametersAreNonnullByDefault
+public class EntityUpdater {
+
+    private final EntityStorage storage;
+
+    /**
+     * Constructs an instance of {@link EntityUpdater}.
+     *
+     * @param storage - The storage this updater operates over. (not null)
+     */
+    public EntityUpdater(EntityStorage storage) {
+        this.storage = requireNonNull(storage);
+    }
+
+    /**
+     * Tries to updates the state of an {@link Entity} until the update 
succeeds
+     * or a non-recoverable exception is thrown.
+     *
+     * @param subject - The Subject of the {@link Entity} that will be 
updated. (not null)
+     * @param mutator - Performs the mutation on the old state of the Entity 
and returns
+     *   the new state of the Entity. (not null)
+     * @throws EntityStorageException A non-recoverable error has caused the 
update to fail.
+     */
+    public void update(RyaURI subject, EntityMutator mutator) throws 
EntityStorageException {
+        requireNonNull(subject);
+        requireNonNull(mutator);
+
+        // Fetch the current state of the Entity.
+        boolean completed = false;
+        while(!completed) {
+            try {
+                final Optional<Entity> old = storage.get(subject);
+                final Optional<Entity> updated = mutator.apply(old);
+
+                final boolean doWork = updated.isPresent();
+                if(doWork) {
+                    if(!old.isPresent()) {
+                        storage.create(updated.get());
+                    } else {
+                        storage.update(old.get(), updated.get());
+                    }
+                }
+                completed = true;
+            } catch(final EntityAlreadyExistsException | StaleUpdateException 
e) {
+                // These are recoverable exceptions. Try again.
+            } catch(final RuntimeException e) {
+                throw new EntityStorageException("Failed to update Entity with 
Subject '" + subject.getData() + "'.", e);
+            }
+        }
+    }
+
+    /**
+     * Implementations of this interface are used to update the state of an
+     * {@link Entity} in unison with a {@link EntityUpdater}.
+     * </p>
+     * This table describes what the updater will do depending on if an Entity
+     * exists and if an updated Entity is returned.
+     * </p>
+     * <table border="1px">
+     *     <tr><th>Entity Provided</th><th>Update 
Returned</th><th>Effect</th></tr>
+     *     <tr>
+     *         <td>true</td>
+     *         <td>true</td>
+     *         <td>The old Entity will be updated using the returned 
state.</td>
+     *     </tr>
+     *     <tr>
+     *         <td>true</td>
+     *         <td>false</td>
+     *         <td>No work is performed.</td>
+     *     </tr>
+     *     <tr>
+     *         <td>false</td>
+     *         <td>true</td>
+     *         <td>A new Entity will be created using the returned state.</td>
+     *     </tr>
+     *     <tr>
+     *         <td>false</td>
+     *         <td>false</td>
+     *         <td>No work is performed.</td>
+     *     </tr>
+     * </table>
+     */
+    public interface EntityMutator extends Function<Optional<Entity>, 
Optional<Entity>> { }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/07643eb7/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/mongo/MongoEntityIndexer.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/mongo/MongoEntityIndexer.java
 
b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/mongo/MongoEntityIndexer.java
new file mode 100644
index 0000000..71b5198
--- /dev/null
+++ 
b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/mongo/MongoEntityIndexer.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.entity.update.mongo;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.indexing.entity.storage.EntityStorage;
+import org.apache.rya.indexing.entity.storage.TypeStorage;
+import org.apache.rya.indexing.entity.storage.mongo.MongoEntityStorage;
+import org.apache.rya.indexing.entity.storage.mongo.MongoTypeStorage;
+import org.apache.rya.indexing.entity.update.BaseEntityIndexer;
+import org.apache.rya.indexing.entity.update.EntityIndexer;
+
+import com.mongodb.MongoClient;
+
+import mvm.rya.mongodb.MongoConnectorFactory;
+import mvm.rya.mongodb.MongoDBRdfConfiguration;
+
+/**
+ * A Mongo DB implementation of {@link EntityIndexer}.
+ */
+@ParametersAreNonnullByDefault
+public class MongoEntityIndexer extends BaseEntityIndexer {
+
+    @Override
+    public EntityStorage getEntityStorage(Configuration conf) {
+        final MongoClient mongoClient = 
MongoConnectorFactory.getMongoClient(conf);
+        final String ryaInstanceName = new 
MongoDBRdfConfiguration(conf).getMongoDBName();
+        return new MongoEntityStorage(mongoClient, ryaInstanceName);
+    }
+
+    @Override
+    public TypeStorage getTypeStorage(Configuration conf) {
+        final MongoClient mongoClient = 
MongoConnectorFactory.getMongoClient(conf);
+        final String ryaInstanceName = new 
MongoDBRdfConfiguration(conf).getMongoDBName();
+        return new MongoTypeStorage(mongoClient, ryaInstanceName);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/07643eb7/extras/indexing/src/test/java/mvm/rya/indexing/entity/query/EntityQueryNodeTest.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/test/java/mvm/rya/indexing/entity/query/EntityQueryNodeTest.java
 
b/extras/indexing/src/test/java/mvm/rya/indexing/entity/query/EntityQueryNodeTest.java
deleted file mode 100644
index ab263e0..0000000
--- 
a/extras/indexing/src/test/java/mvm/rya/indexing/entity/query/EntityQueryNodeTest.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package mvm.rya.indexing.entity.query;
-
-import static org.mockito.Mockito.mock;
-
-import java.util.List;
-
-import org.junit.Test;
-import org.openrdf.model.vocabulary.RDF;
-import org.openrdf.query.MalformedQueryException;
-import org.openrdf.query.algebra.StatementPattern;
-import org.openrdf.query.algebra.helpers.StatementPatternCollector;
-import org.openrdf.query.parser.sparql.SPARQLParser;
-
-import com.google.common.collect.ImmutableSet;
-
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.indexing.entity.model.Type;
-import mvm.rya.indexing.entity.storage.EntityStorage;
-
-/**
- * Unit tests the methods of {@link EntityQueryNode}.
- */
-public class EntityQueryNodeTest {
-
-    private static final Type PERSON_TYPE =
-            new Type(new RyaURI("urn:person"),
-                ImmutableSet.<RyaURI>builder()
-                    .add(new RyaURI("urn:name"))
-                    .add(new RyaURI("urn:age"))
-                    .add(new RyaURI("urn:eye"))
-                    .build());
-
-    private static final Type EMPLOYEE_TYPE =
-            new Type(new RyaURI("urn:employee"),
-                ImmutableSet.<RyaURI>builder()
-                    .add(new RyaURI("urn:name"))
-                    .add(new RyaURI("urn:hoursPerWeek"))
-                    .build());
-
-    @Test(expected = IllegalStateException.class)
-    public void constructor_differentSubjects() throws Exception {
-        // A pattern that has two different subjects.
-        final List<StatementPattern> patterns = getSPs(
-                "SELECT * WHERE { " +
-                    "<urn:SSN:111-11-1111> <" + RDF.TYPE + "> <urn:person> ."+
-                    "<urn:SSN:111-11-1111> <urn:age> ?age . " +
-                    "<urn:SSN:111-11-1111> <urn:eye> ?eye . " +
-                    "<urn:SSN:111-11-1111> <urn:name> ?name . " +
-                    "<urn:SSN:222-22-2222> <urn:age> ?age . " +
-                    "<urn:SSN:222-22-2222> <urn:eye> ?eye . " +
-                    "<urn:SSN:222-22-2222> <urn:name> ?name . " +
-                "}");
-
-
-        // This will fail.
-        new EntityQueryNode(PERSON_TYPE, patterns, mock(EntityStorage.class));
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void constructor_variablePredicate() throws Exception {
-        // A pattern that has a variable for its predicate.
-        final List<StatementPattern> patterns = getSPs(
-                "SELECT * WHERE { " +
-                    "?subject <" + RDF.TYPE + "> <urn:person> ."+
-                    "?subject ?variableProperty ?value . " +
-                    "?subject <urn:eye> ?eye . " +
-                    "?subject <urn:name> ?name . " +
-                "}");
-
-
-        // This will fail.
-        new EntityQueryNode(PERSON_TYPE, patterns, mock(EntityStorage.class));
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void constructor_predicateNotPartOfType() throws Exception {
-        // A pattern that does uses a predicate that is not part of the type.
-        final List<StatementPattern> patterns = getSPs(
-                "SELECT * WHERE { " +
-                    "?subject <" + RDF.TYPE + "> <urn:person> ."+
-                    "?subject <urn:age> ?age . " +
-                    "?subject <urn:eye> ?eye . " +
-                    "?subject <urn:name> ?name . " +
-                    "?subject <urn:notPartOfType> ?value . " +
-                "}");
-
-        // This will fail.
-        new EntityQueryNode(PERSON_TYPE, patterns, mock(EntityStorage.class));
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void constructor_typeMissing() throws Exception {
-        // A pattern that does uses a predicate that is not part of the type.
-        final List<StatementPattern> patterns = getSPs(
-                "SELECT * WHERE { " +
-                    "?subject <urn:age> ?age . " +
-                    "?subject <urn:eye> ?eye . " +
-                    "?subject <urn:name> ?name . " +
-                "}");
-
-        // This will fail.
-        new EntityQueryNode(PERSON_TYPE, patterns, mock(EntityStorage.class));
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void constructor_wrongType() throws Exception {
-        // A pattern that does uses a predicate that is not part of the type.
-        final List<StatementPattern> patterns = getSPs(
-                "SELECT * WHERE { " +
-                    "?subject <" + RDF.TYPE + "> <urn:person> ."+
-                    "?subject <urn:age> ?age . " +
-                    "?subject <urn:eye> ?eye . " +
-                    "?subject <urn:name> ?name . " +
-                "}");
-
-        // This will fail.
-        new EntityQueryNode(EMPLOYEE_TYPE, patterns, 
mock(EntityStorage.class));
-    }
-
-    // Happy path test.
-
-    // TODO test for all of the types of preconditions
-    //      test when a binding set can join
-    //      test when a binding set does not join
-    //      test when there are constants that are part of the query
-    //      test when there are variables that are part of the query.
-
-    @Test
-    public void evaluate_constantSubject() throws Exception {
-        // A set of patterns that match a sepecific Entity subject.
-        final List<StatementPattern> patterns = getSPs(
-                "SELECT * WHERE { " +
-                    "<urn:SSN:111-11-1111> <" + RDF.TYPE + "> <urn:person> ."+
-                    "<urn:SSN:111-11-1111> <urn:age> ?age . " +
-                    "<urn:SSN:111-11-1111> <urn:eye> ?eye . " +
-                    "<urn:SSN:111-11-1111> <urn:name> ?name . " +
-                "}");
-
-        new EntityQueryNode(PERSON_TYPE, patterns, mock(EntityStorage.class));
-
-
-        // TODO implement
-    }
-
-    @Test
-    public void evaluate_variableSubject() throws Exception {
-        // A set of patterns that matches a variable Entity subject.
-        final List<StatementPattern> patterns = getSPs(
-                "SELECT * WHERE { " +
-                    "?subject <" + RDF.TYPE + "> <urn:person> ."+
-                    "?subject <urn:age> ?age . " +
-                    "?subject <urn:eye> ?eye . " +
-                    "?subject <urn:name> ?name . " +
-                "}");
-
-        new EntityQueryNode(PERSON_TYPE, patterns, mock(EntityStorage.class));
-
-
-        // TODO implement
-    }
-
-
-
-    /**
-     * TODO doc
-     *
-     * @param sparql
-     * @return
-     * @throws MalformedQueryException
-     */
-    private static List<StatementPattern> getSPs(String sparql) throws 
MalformedQueryException {
-        final StatementPatternCollector spCollector = new 
StatementPatternCollector();
-        new SPARQLParser().parseQuery(sparql, 
null).getTupleExpr().visit(spCollector);
-        return spCollector.getStatementPatterns();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/07643eb7/extras/indexing/src/test/java/mvm/rya/indexing/entity/storage/mongo/EntityDocumentConverterTest.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/test/java/mvm/rya/indexing/entity/storage/mongo/EntityDocumentConverterTest.java
 
b/extras/indexing/src/test/java/mvm/rya/indexing/entity/storage/mongo/EntityDocumentConverterTest.java
deleted file mode 100644
index bbd3f06..0000000
--- 
a/extras/indexing/src/test/java/mvm/rya/indexing/entity/storage/mongo/EntityDocumentConverterTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package mvm.rya.indexing.entity.storage.mongo;
-
-import static org.junit.Assert.assertEquals;
-
-import org.bson.Document;
-import org.junit.Test;
-import org.openrdf.model.vocabulary.XMLSchema;
-
-import mvm.rya.api.domain.RyaType;
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.indexing.entity.model.Entity;
-import mvm.rya.indexing.entity.model.Property;
-import 
mvm.rya.indexing.entity.storage.mongo.DocumentConverter.DocumentConverterException;
-
-/**
- * Tests the methods of {@link EntityDocumentConverter}.
- */
-public class EntityDocumentConverterTest {
-
-    @Test
-    public void to_and_from_document() throws DocumentConverterException {
-        // Convert an Entity into a Document.
-        final Entity entity = Entity.builder()
-                .setSubject(new RyaURI("urn:alice"))
-                // Add some explicily typed properties.
-                .setExplicitType(new RyaURI("urn:person"))
-                .setProperty(new RyaURI("urn:person"), new Property(new 
RyaURI("urn:age"), new RyaType(XMLSchema.INT, "30")))
-                .setProperty(new RyaURI("urn:person"), new Property(new 
RyaURI("urn:eye"), new RyaType("blue")))
-                // Add some implicitly typed properties.
-                .setProperty(new RyaURI("urn:employee"), new Property(new 
RyaURI("urn:hours"), new RyaType(XMLSchema.INT, "40")))
-                .setProperty(new RyaURI("urn:employee"), new Property(new 
RyaURI("urn:employer"), new RyaType("Burger Joint")))
-                .build();
-
-        final Document document = new 
EntityDocumentConverter().toDocument(entity);
-
-        // Convert the Document back into an Entity.
-        final Entity converted = new 
EntityDocumentConverter().fromDocument(document);
-
-        // Ensure the original matches the round trip converted Entity.
-        assertEquals(entity, converted);
-    }
-}
\ No newline at end of file

Reply via email to