Improve read repair for missing Edge (app->collection).  Also enhance read 
repair for unique value cleanup to happen only upon initial read before 
write-first strategy for Unique Values. Add a bunch more tests around the read 
repairs.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/724968a2
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/724968a2
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/724968a2

Branch: refs/heads/master
Commit: 724968a2bc354e3c3f317e1d0b98026c2fe3baeb
Parents: 9c4b524
Author: Michael Russo <[email protected]>
Authored: Wed Jun 29 01:45:57 2016 -0700
Committer: Michael Russo <[email protected]>
Committed: Wed Jun 29 01:45:57 2016 -0700

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |  6 ++
 .../corepersistence/CpRelationManager.java      | 42 +++++++++
 .../usergrid/persistence/EntityManager.java     | 16 +++-
 .../usergrid/persistence/RelationManager.java   |  2 +
 .../impl/EntityCollectionManagerImpl.java       |  5 +-
 .../mvcc/stage/write/WriteUniqueVerify.java     | 13 ++-
 .../UniqueValueSerializationStrategy.java       |  7 +-
 .../UniqueValueSerializationStrategyImpl.java   | 89 +++++++++++---------
 ...iqueValueSerializationStrategyProxyImpl.java |  8 +-
 .../mvcc/stage/write/WriteUniqueVerifyIT.java   | 71 ++++++++++++++++
 ...niqueValueSerializationStrategyImplTest.java | 26 ++++--
 .../collection/users/PermissionsResourceIT.java |  4 +-
 .../services/AbstractCollectionService.java     | 37 ++++++--
 .../usergrid/services/ServiceInvocationIT.java  | 28 +++++-
 14 files changed, 280 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/724968a2/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index ab62b36..3dc0d13 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -1545,6 +1545,12 @@ public class CpEntityManager implements EntityManager {
         getRelationManager( entityRef ).removeFromCollection( collectionName, 
itemRef );
     }
 
+    @Override
+    public void removeItemFromCollection( EntityRef entityRef, String 
collectionName, EntityRef itemRef ) throws Exception {
+
+        getRelationManager( entityRef ).removeItemFromCollection( 
collectionName, itemRef );
+    }
+
 
     @Override
     public Set<String> getCollectionIndexes( EntityRef entity, String 
collectionName ) throws Exception {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/724968a2/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 6e1bade..fbf0b14 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -579,6 +579,48 @@ public class CpRelationManager implements RelationManager {
         }
     }
 
+    @Override
+    public void removeItemFromCollection( String collectionName, EntityRef 
itemRef ) throws Exception {
+
+        Id entityId = new SimpleId( itemRef.getUuid(), itemRef.getType() );
+
+        // remove edge from collection to item
+        GraphManager gm = managerCache.getGraphManager( applicationScope );
+
+
+
+        // mark the edge versions and take the first for later delete edge 
queue event ( load is descending )
+        final Edge markedSourceEdge = gm.loadEdgeVersions(
+            CpNamingUtils.createEdgeFromCollectionName( cpHeadEntity.getId(), 
collectionName, entityId ) )
+            .flatMap(edge -> 
gm.markEdge(edge)).toBlocking().firstOrDefault(null);
+
+
+        Edge markedReversedEdge = null;
+        CollectionInfo collection = getDefaultSchema().getCollection( 
headEntity.getType(), collectionName );
+        if (collection != null && collection.getLinkedCollection() != null) {
+            // delete reverse edges
+            final String pluralType = InflectionUtils.pluralize( 
cpHeadEntity.getId().getType() );
+            markedReversedEdge = gm.loadEdgeVersions(
+                CpNamingUtils.createEdgeFromCollectionName( entityId, 
pluralType, cpHeadEntity.getId() ) )
+                .flatMap(reverseEdge -> 
gm.markEdge(reverseEdge)).toBlocking().firstOrDefault(null);
+        }
+
+
+        /**
+         * Remove from the index.  This will call gm.deleteEdge which also 
deletes the reverse edge(s) and de-indexes
+         * older versions of the edge(s).
+         *
+         */
+        if( markedSourceEdge != null ) {
+            indexService.queueDeleteEdge(applicationScope, markedSourceEdge);
+        }
+        if( markedReversedEdge != null ){
+            indexService.queueDeleteEdge(applicationScope, markedReversedEdge);
+
+        }
+
+    }
+
 
     @Override
     public void copyRelationships( String srcRelationName, EntityRef 
dstEntityRef, String dstRelationName )

http://git-wip-us.apache.org/repos/asf/usergrid/blob/724968a2/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java 
b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
index 53a7a89..7e25a80 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
@@ -394,17 +394,29 @@ public interface EntityManager {
                                           Map<String, Object> properties ) 
throws Exception;
 
     /**
-     * Removes an entity to the specified collection belonging to the 
specified entity.
+     * Deletes an entity from the specified collection.
      *
      * @param entityRef an entity reference
      * @param collectionName the collection name.
-     * @param itemRef a entity to be removed from the collection.
+     * @param itemRef a entity to be deleted and removed from the collection.
      *
      * @throws Exception the exception
      */
     public void removeFromCollection( EntityRef entityRef, String 
collectionName, EntityRef itemRef)
             throws Exception;
 
+    /**
+     * Removes only the edge from the specified collection, the entity is left 
in-tact
+     *
+     * @param entityRef an entity reference
+     * @param collectionName the collection name.
+     * @param itemRef a entity to be removed from the collection.
+     *
+     * @throws Exception the exception
+     */
+    public void removeItemFromCollection( EntityRef entityRef, String 
collectionName, EntityRef itemRef)
+        throws Exception;
+
     public Results searchCollection( EntityRef entityRef, String 
collectionName, Query query )
             throws Exception;
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/724968a2/stack/core/src/main/java/org/apache/usergrid/persistence/RelationManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/persistence/RelationManager.java 
b/stack/core/src/main/java/org/apache/usergrid/persistence/RelationManager.java
index 0011183..f0647ac 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/persistence/RelationManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/persistence/RelationManager.java
@@ -59,6 +59,8 @@ public interface RelationManager {
 
     public void removeFromCollection( String collectionName, EntityRef itemRef 
) throws Exception;
 
+    public void removeItemFromCollection( String collectionName, EntityRef 
itemRef ) throws Exception;
+
     public void copyRelationships( String srcRelationName, EntityRef 
dstEntityRef, String dstRelationName )
             throws Exception;
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/724968a2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 1ccc18f..523b4df 100644
--- 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -385,10 +385,11 @@ public class EntityCollectionManagerImpl implements 
EntityCollectionManager {
                     if ( entity == null || !entity.getEntity().isPresent() ) {
 
                         if(logger.isTraceEnabled()) {
-                            logger.trace("Unique value [{}={}] does not have 
corresponding entity, executing " +
+                            logger.trace("Unique value [{}={}] does not have 
corresponding entity [{}], executing " +
                                 "read repair to remove stale unique value 
entry",
                                 expectedUnique.getField().getName(),
-                                expectedUnique.getField().getValue().toString()
+                                
expectedUnique.getField().getValue().toString(),
+                                expectedUnique.getEntityId()
                             );
                         }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/724968a2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index 7b76dc8..d7c8ecd 100644
--- 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -122,7 +122,9 @@ public class WriteUniqueVerify implements 
Action1<CollectionIoEvent<MvccEntity>>
             try {
 
                 // loading will retrieve the oldest unique value entry for the 
field
-                UniqueValueSet set = uniqueValueStrat.load(scope, 
written.getEntityId().getType(), Collections.singletonList(written.getField()));
+                // purposely enable the read repair here to clean up before we 
write
+                UniqueValueSet set = uniqueValueStrat.load(scope, 
cassandraFig.getReadCL(),
+                    written.getEntityId().getType(), 
Collections.singletonList(written.getField()), true);
 
 
                 set.forEach(uniqueValue -> {
@@ -149,6 +151,11 @@ public class WriteUniqueVerify implements 
Action1<CollectionIoEvent<MvccEntity>>
         }
 
         if(preWriteUniquenessViolations.size() > 0 ){
+            if(logger.isTraceEnabled()){
+                logger.trace("Pre-write unique violations found, raising 
exception before executing first write");
+            }
+            logger.error("Pre-write unique violations found, raising exception 
before executing first write");
+
             throw new WriteUniqueVerifyException(mvccEntity, scope,
                 preWriteUniquenessViolations );
         }
@@ -217,7 +224,9 @@ public class WriteUniqueVerify implements 
Action1<CollectionIoEvent<MvccEntity>>
             final UniqueValueSet uniqueValues;
             try {
                 // load ascending for verification to make sure we wrote is 
the last read back
-                uniqueValues = uniqueValueSerializationStrategy.load( scope, 
consistencyLevel, type,  uniqueFields );
+                // don't read repair on this read because our write-first 
strategy will introduce a duplicate
+                uniqueValues =
+                    uniqueValueSerializationStrategy.load( scope, 
consistencyLevel, type,  uniqueFields, false);
             }
             catch ( ConnectionException e ) {
                 throw new RuntimeException( "Unable to read from cassandra", e 
);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/724968a2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
index 95cfa68..35bb1b8 100644
--- 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
+++ 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
@@ -79,11 +79,12 @@ public interface UniqueValueSerializationStrategy extends 
Migration, VersionedDa
     * @param consistencyLevel Consistency level of query
     * @param type The type the unique value exists within
     * @param fields Field name/value to search for
-    * @return UniqueValueSet containing fields from the collection that exist 
in cassandra
+    * @param useReadRepair
+     * @return UniqueValueSet containing fields from the collection that exist 
in cassandra
     * @throws ConnectionException on error connecting to Cassandra
     */
-    UniqueValueSet load( ApplicationScope applicationScope, ConsistencyLevel 
consistencyLevel, String type,
-                         Collection<Field> fields ) throws ConnectionException;
+    UniqueValueSet load(ApplicationScope applicationScope, ConsistencyLevel 
consistencyLevel, String type,
+                        Collection<Field> fields, boolean useReadRepair) 
throws ConnectionException;
 
 
     /**

http://git-wip-us.apache.org/repos/asf/usergrid/blob/724968a2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
index aec2e58..db93272 100644
--- 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
+++ 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
@@ -237,13 +237,13 @@ public abstract class 
UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
     @Override
     public UniqueValueSet load( final ApplicationScope colScope, final String 
type, final Collection<Field> fields )
         throws ConnectionException {
-        return load( colScope, ConsistencyLevel.valueOf( 
cassandraFig.getReadCL() ), type, fields );
+        return load( colScope, ConsistencyLevel.valueOf( 
cassandraFig.getReadCL() ), type, fields, false);
     }
 
 
     @Override
-    public UniqueValueSet load( final ApplicationScope appScope, final 
ConsistencyLevel consistencyLevel,
-                                final String type, final Collection<Field> 
fields ) throws ConnectionException {
+    public UniqueValueSet load(final ApplicationScope appScope, final 
ConsistencyLevel consistencyLevel,
+                               final String type, final Collection<Field> 
fields, boolean useReadRepair) throws ConnectionException {
 
         Preconditions.checkNotNull( fields, "fields are required" );
         Preconditions.checkArgument( fields.size() > 0, "More than 1 field 
must be specified" );
@@ -307,71 +307,78 @@ public abstract class 
UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
                 final UniqueValue uniqueValue =
                     new UniqueValueImpl(field, entityVersion.getEntityId(), 
entityVersion.getEntityVersion());
 
-
                 // set the initial candidate and move on
-                if(candidates.size() == 0){
+                if (candidates.size() == 0) {
                     candidates.add(uniqueValue);
                     continue;
                 }
 
-                final int result = uniqueValueComparator.compare(uniqueValue, 
candidates.get(candidates.size() -1));
+                if(!useReadRepair){
 
-                if(result == 0){
+                    // take only the first
+                    break;
 
-                    // do nothing, only versions can be newer and we're not 
worried about newer versions of same entity
-                    if(logger.isTraceEnabled()){
-                        logger.trace("Candidate unique value is equal to the 
current unique value");
-                    }
+                } else {
 
-                    // update candidate w/ latest version
-                    candidates.add(uniqueValue);
 
-                }else if(result < 0){
+                    final int result = 
uniqueValueComparator.compare(uniqueValue, candidates.get(candidates.size() - 
1));
 
-                    // delete the duplicate from the unique value index
-                    candidates.forEach(candidate -> {
+                    if (result == 0) {
 
-                        try {
+                        // do nothing, only versions can be newer and we're 
not worried about newer versions of same entity
+                        if (logger.isTraceEnabled()) {
+                            logger.trace("Candidate unique value is equal to 
the current unique value");
+                        }
 
-                            logger.warn("Duplicate unique value [{}={}] found 
for application [{}], removing newer " +
-                                    "entry with entity id [{}] and entity 
version [{}]", field.getName(),
-                                    field.getValue().toString(), 
applicationId.getUuid(),
-                                candidate.getEntityId().getUuid(), 
candidate.getEntityVersion() );
+                        // update candidate w/ latest version
+                        candidates.add(uniqueValue);
 
-                            delete(appScope, candidate ).execute();
+                    } else if (result < 0) {
 
-                        } catch (ConnectionException e) {
-                            // do nothing for now
-                        }
+                        // delete the duplicate from the unique value index
+                        candidates.forEach(candidate -> {
 
-                    });
+                            try {
 
-                    // clear the transient candidates list
-                    candidates.clear();
+                                logger.warn("Duplicate unique value [{}={}] 
found for application [{}], removing newer " +
+                                        "entry with entity id [{}] and entity 
version [{}]", field.getName(),
+                                    field.getValue().toString(), 
applicationId.getUuid(),
+                                    candidate.getEntityId().getUuid(), 
candidate.getEntityVersion());
 
-                    if(logger.isTraceEnabled()) {
-                        logger.trace("Updating candidate to entity id [{}] and 
entity version [{}]",
-                            uniqueValue.getEntityId().getUuid(), 
uniqueValue.getEntityVersion());
+                                delete(appScope, candidate).execute();
 
-                    }
+                            } catch (ConnectionException e) {
+                                // do nothing for now
+                            }
 
-                    // add our new candidate to the list
-                    candidates.add(uniqueValue);
+                        });
+
+                        // clear the transient candidates list
+                        candidates.clear();
 
+                        if (logger.isTraceEnabled()) {
+                            logger.trace("Updating candidate to entity id [{}] 
and entity version [{}]",
+                                uniqueValue.getEntityId().getUuid(), 
uniqueValue.getEntityVersion());
 
-                }else{
+                        }
 
-                    logger.warn("Duplicate unique value [{}={}] found for 
application [{}], removing newer entry " +
-                            "with entity id [{}] and entity version [{}].", 
field.getName(), field.getValue().toString(),
-                        applicationId.getUuid(), 
uniqueValue.getEntityId().getUuid(), uniqueValue.getEntityVersion() );
+                        // add our new candidate to the list
+                        candidates.add(uniqueValue);
 
-                    // delete the duplicate from the unique value index
-                    delete(appScope, uniqueValue ).execute();
 
+                    } else {
 
-                }
+                        logger.warn("Duplicate unique value [{}={}] found for 
application [{}], removing newer entry " +
+                                "with entity id [{}] and entity version 
[{}].", field.getName(), field.getValue().toString(),
+                            applicationId.getUuid(), 
uniqueValue.getEntityId().getUuid(), uniqueValue.getEntityVersion());
 
+                        // delete the duplicate from the unique value index
+                        delete(appScope, uniqueValue).execute();
 
+
+                    }
+
+                }
             }
 
             // take the last candidate ( should be the latest version) and add 
to the result set

http://git-wip-us.apache.org/repos/asf/usergrid/blob/724968a2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java
 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java
index 1de4052..b9c9999 100644
--- 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java
+++ 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java
@@ -115,16 +115,16 @@ public class UniqueValueSerializationStrategyProxyImpl 
implements UniqueValueSer
 
 
     @Override
-    public UniqueValueSet load( final ApplicationScope applicationScope, final 
ConsistencyLevel consistencyLevel,
-                                final String type, final Collection<Field> 
fields ) throws ConnectionException {
+    public UniqueValueSet load(final ApplicationScope applicationScope, final 
ConsistencyLevel consistencyLevel,
+                               final String type, final Collection<Field> 
fields, boolean useReadRepair) throws ConnectionException {
 
         final MigrationRelationship<UniqueValueSerializationStrategy> 
migration = getMigrationRelationShip();
 
         if ( migration.needsMigration() ) {
-            return migration.from.load( applicationScope, type, fields );
+            return migration.from.load( applicationScope, consistencyLevel, 
type, fields, useReadRepair );
         }
 
-        return migration.to.load( applicationScope, type, fields );
+        return migration.to.load( applicationScope, consistencyLevel, type, 
fields, useReadRepair );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/724968a2/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java
 
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java
index 9d0cd20..3d411a4 100644
--- 
a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java
+++ 
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java
@@ -18,6 +18,12 @@
 package org.apache.usergrid.persistence.collection.mvcc.stage.write;
 
 
+import org.apache.usergrid.persistence.collection.FieldSet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import 
org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import 
org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -41,6 +47,8 @@ import 
org.apache.usergrid.persistence.model.field.StringField;
 
 import com.google.inject.Inject;
 
+import java.util.Collections;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -60,6 +68,9 @@ public class WriteUniqueVerifyIT {
     public MigrationManagerRule migrationManagerRule;
 
     @Inject
+    public UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
+
+    @Inject
     public EntityCollectionManagerFactory cmf;
 
     @Test
@@ -142,4 +153,64 @@ public class WriteUniqueVerifyIT {
         entity.setField( new StringField("foo", "bar"));
         entityManager.write( entity ).toBlocking().last();
     }
+
+    @Test
+    public void testConflictReadRepair() throws Exception {
+
+        final Id appId = new SimpleId("testNoConflict");
+
+
+
+        final ApplicationScope scope = new ApplicationScopeImpl( appId);
+
+        final EntityCollectionManager entityManager = 
cmf.createCollectionManager( scope );
+
+        final Entity entity = TestEntityGenerator.generateEntity();
+        entity.setField(new StringField("name", "Porsche 911 GT3", true));
+        entity.setField(new StringField("identifier", "911gt3", true));
+        entity.setField(new IntegerField("top_speed_mph", 194));
+        entityManager.write( entity ).toBlocking().last();
+
+
+        FieldSet fieldSet =
+            entityManager.getEntitiesFromFields("test", 
Collections.singletonList(entity.getField("name")), true)
+            .toBlocking().last();
+
+        MvccEntity entityFetched = fieldSet.getEntity( entity.getField("name") 
);
+
+
+        final Entity entityDuplicate = TestEntityGenerator.generateEntity();
+        UniqueValue uniqueValue = new UniqueValueImpl(new StringField("name", 
"Porsche 911 GT3", true),
+            entityDuplicate.getId(), UUIDGenerator.newTimeUUID());
+
+        // manually insert a record to simulate a 'duplicate' trying to be 
inserted
+        uniqueValueSerializationStrategy.
+            write(scope, uniqueValue).execute();
+
+
+
+        FieldSet fieldSetAgain =
+            entityManager.getEntitiesFromFields("test", 
Collections.singletonList(entity.getField("name")), true)
+                .toBlocking().last();
+
+        MvccEntity entityFetchedAgain = fieldSetAgain.getEntity( 
entity.getField("name") );
+
+        assertEquals(entityFetched, entityFetchedAgain);
+
+
+        // now test writing the original entity again ( simulates a PUT )
+        // this should read repair and work
+        entityManager.write( entity ).toBlocking().last();
+
+        FieldSet fieldSetAgainAgain =
+            entityManager.getEntitiesFromFields("test", 
Collections.singletonList(entity.getField("name")), true)
+                .toBlocking().last();
+
+        MvccEntity entityFetchedAgainAgain = fieldSetAgainAgain.getEntity( 
entity.getField("name") );
+
+        assertEquals(entityFetched, entityFetchedAgainAgain);
+
+
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/724968a2/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
 
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
index ed3e42b..3dbf1ec 100644
--- 
a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
+++ 
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.UUID;
 
+import com.netflix.astyanax.model.ConsistencyLevel;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -369,7 +370,8 @@ public abstract class 
UniqueValueSerializationStrategyImplTest {
         strategy.write( scope, stored2 ).execute();
 
         // load descending to get the older version of entity for this unique 
value
-        UniqueValueSet fields = strategy.load( scope, entityId1.getType(), 
Collections.<Field>singleton( field ));
+        UniqueValueSet fields = strategy.load( scope, 
ConsistencyLevel.CL_LOCAL_QUORUM,
+            entityId1.getType(), Collections.<Field>singleton( field ), true);
 
         UniqueValue retrieved = fields.getValue( field.getName() );
 
@@ -383,7 +385,8 @@ public abstract class 
UniqueValueSerializationStrategyImplTest {
         strategy.write( scope, stored3 ).execute();
 
         // load the values again, we should still only get back the original 
unique value
-        fields = strategy.load( scope, entityId1.getType(), 
Collections.<Field>singleton( field ));
+        fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM,
+            entityId1.getType(), Collections.<Field>singleton( field ), true);
 
         retrieved = fields.getValue( field.getName() );
 
@@ -396,7 +399,8 @@ public abstract class 
UniqueValueSerializationStrategyImplTest {
         strategy.write( scope, stored4 ).execute();
 
         // load the values again, now we should get the latest version of the 
original UUID written
-        fields = strategy.load( scope, entityId1.getType(), 
Collections.<Field>singleton( field ));
+        fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM,
+            entityId1.getType(), Collections.<Field>singleton( field ), true);
 
         retrieved = fields.getValue( field.getName() );
 
@@ -433,7 +437,8 @@ public abstract class 
UniqueValueSerializationStrategyImplTest {
         strategy.write( scope, stored2 ).execute();
 
         // load descending to get the older version of entity for this unique 
value
-        UniqueValueSet fields = strategy.load( scope, entityId1.getType(), 
Collections.<Field>singleton( field ));
+        UniqueValueSet fields = strategy.load( scope, 
ConsistencyLevel.CL_LOCAL_QUORUM,
+            entityId1.getType(), Collections.<Field>singleton( field ), true);
 
         UniqueValue retrieved = fields.getValue( field.getName() );
         Assert.assertNotNull( retrieved );
@@ -470,7 +475,12 @@ public abstract class 
UniqueValueSerializationStrategyImplTest {
 
 
         // load descending to get the older version of entity for this unique 
value
-        UniqueValueSet fields = strategy.load( scope, entityId1.getType(), 
Collections.<Field>singleton( field ));
+        UniqueValueSet fields = strategy.load( scope, 
ConsistencyLevel.CL_LOCAL_QUORUM,
+            entityId1.getType(), Collections.<Field>singleton( field ), true);
+
+
+        fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM,
+            entityId1.getType(), Collections.<Field>singleton( field ), false);
 
         UniqueValue retrieved = fields.getValue( field.getName() );
         assertEquals( stored3, retrieved );
@@ -506,7 +516,8 @@ public abstract class 
UniqueValueSerializationStrategyImplTest {
 
 
         // load descending to get the older version of entity for this unique 
value
-        UniqueValueSet fields = strategy.load( scope, entityId1.getType(), 
Collections.<Field>singleton( field ));
+        UniqueValueSet fields = strategy.load( scope,
+            ConsistencyLevel.CL_LOCAL_QUORUM, entityId1.getType(), 
Collections.<Field>singleton( field ), true);
 
         UniqueValue retrieved = fields.getValue( field.getName() );
         assertEquals( stored1, retrieved );
@@ -549,7 +560,8 @@ public abstract class 
UniqueValueSerializationStrategyImplTest {
 
 
         // load descending to get the older version of entity for this unique 
value
-        UniqueValueSet fields = strategy.load( scope, entityId1.getType(), 
Collections.<Field>singleton( field ));
+        UniqueValueSet fields = strategy.load( scope, 
ConsistencyLevel.CL_LOCAL_QUORUM,
+            entityId1.getType(), Collections.<Field>singleton( field ), true);
 
         UniqueValue retrieved = fields.getValue( field.getName() );
         assertEquals( stored1, retrieved );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/724968a2/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/PermissionsResourceIT.java
----------------------------------------------------------------------
diff --git 
a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/PermissionsResourceIT.java
 
b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/PermissionsResourceIT.java
index b0b3791..5380e00 100644
--- 
a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/PermissionsResourceIT.java
+++ 
b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/PermissionsResourceIT.java
@@ -116,7 +116,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
         }
 
         // check if the role was assigned
-        assertEquals(status, 404);
+        assertEquals(404, status);
     }
 
 
@@ -167,7 +167,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
             fail("Should not have been able to retrieve the user as it was 
deleted");
         }catch (ClientErrorException e){
             status=e.getResponse().getStatus();
-            assertEquals( 404,status );
+            assertEquals( 404, status );
         }
 
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/724968a2/stack/services/src/main/java/org/apache/usergrid/services/AbstractCollectionService.java
----------------------------------------------------------------------
diff --git 
a/stack/services/src/main/java/org/apache/usergrid/services/AbstractCollectionService.java
 
b/stack/services/src/main/java/org/apache/usergrid/services/AbstractCollectionService.java
index 7539e0c..14b1df1 100644
--- 
a/stack/services/src/main/java/org/apache/usergrid/services/AbstractCollectionService.java
+++ 
b/stack/services/src/main/java/org/apache/usergrid/services/AbstractCollectionService.java
@@ -41,6 +41,7 @@ import org.apache.usergrid.services.ServiceResults.Type;
 import 
org.apache.usergrid.services.exceptions.ForbiddenServiceOperationException;
 import 
org.apache.usergrid.services.exceptions.ServiceResourceNotFoundException;
 
+import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
 import static org.apache.usergrid.utils.ClassUtils.cast;
 
 
@@ -125,16 +126,34 @@ public class AbstractCollectionService extends 
AbstractService {
             checkPermissionsForEntity( context, entity );
         }
 
-        // the context of the entity they're trying to load isn't owned by the 
owner
-        // in the path, don't return it
+        // check ownership based on graph
         if ( !em.isCollectionMember( context.getOwner(), 
context.getCollectionName(), entity ) ) {
-            logger.info( "Someone tried to GET entity {} they don't own. 
Entity id {} with owner {}",
-                    getEntityType(), id, context.getOwner()
-            );
-            throw new ServiceResourceNotFoundException( context );
-        }
 
-        // TODO check that entity is in fact in the collection
+            // the entity is already loaded in the scope of the owner and type 
( collection ) so it must exist at this point
+            // if for some reason it's not a member of the collection, it 
should be and read repair it
+            if( context.getOwner().getType().equals(TYPE_APPLICATION) ){
+                logger.warn( "Edge missing for entity id {} with owner {}. 
Executing edge read repair to create new edge in " +
+                    "collection {}", id, context.getOwner(), 
context.getCollectionName());
+
+                em.addToCollection( context.getOwner(), 
context.getCollectionName(), entity);
+
+                // do a final check to be absolutely sure we're good now 
before returning back to the client
+                // TODO : Keep thinking if the double-check read after repair 
is necessary.  Favoring stability here
+                if ( !em.isCollectionMember( context.getOwner(), 
context.getCollectionName(), entity ) ) {
+                    logger.error( "Edge read repair failed for entity id {} 
with owner {} in collection {}",
+                        id, context.getOwner(), context.getCollectionName());
+
+                    throw new ServiceResourceNotFoundException( context );
+                }
+
+            }
+            // if not head application, then we can't assume the ownership is 
meant to be there
+            else{
+                throw new ServiceResourceNotFoundException( context );
+            }
+
+
+        }
 
         List<ServiceRequest> nextRequests = context.getNextServiceRequests( 
entity );
 
@@ -158,7 +177,7 @@ public class AbstractCollectionService extends 
AbstractService {
         if ( entityId == null ) {
 
             if (logger.isTraceEnabled()) {
-                logger.trace("miss on entityType: {} with name: {}", 
getEntityType(), name);
+                logger.trace("Miss on entityType: {} with name: {}", 
getEntityType(), name);
             }
 
             String msg = "Cannot find entity with name: "+name;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/724968a2/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java
----------------------------------------------------------------------
diff --git 
a/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java
 
b/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java
index 57f0bb2..8c2be2c 100644
--- 
a/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java
+++ 
b/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java
@@ -24,11 +24,14 @@ import java.util.Map;
 import java.util.UUID;
 
 import org.apache.usergrid.cassandra.ClearShiroSubject;
-import org.apache.usergrid.persistence.Entity;
-import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.*;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 
+import org.apache.usergrid.utils.InflectionUtils;
 import org.junit.Rule;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -221,4 +224,25 @@ public class ServiceInvocationIT extends AbstractServiceIT 
{
         app.testRequest( ServiceAction.GET, 1,
                 "projects", project.getName(), "contains", "contributors", 
contributor.getName());
     }
+
+    @Test
+    public void testGetByIdAndNameEdgeReadRepair() throws Exception {
+
+        EntityManager em = setup.getEmf().getEntityManager( app.getId() );
+
+        Entity contributor = app.doCreate( "contributor", "Malaka" );
+
+        EntityRef appRef = new SimpleEntityRef("application", app.getId());
+
+
+        em.removeItemFromCollection(appRef, 
InflectionUtils.pluralize(contributor.getType()), contributor);
+
+        assertFalse("Entity should not have an edge from app to entity",
+            em.isCollectionMember(appRef, 
InflectionUtils.pluralize(contributor.getType()), contributor));
+
+        app.testRequest( ServiceAction.GET, 1, "contributor", 
contributor.getName());
+
+        assertTrue("Entity should now be member of the collection",
+            em.isCollectionMember(appRef, 
InflectionUtils.pluralize(contributor.getType()), contributor));
+    }
 }

Reply via email to