Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-dev 5594d7451 -> 0714b1100


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
index b0f3995..8be93f6 100644
--- 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
+++ 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
@@ -20,40 +20,54 @@
 package org.apache.usergrid.corepersistence.index;
 
 
-import java.util.List;
 import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
 
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import org.apache.usergrid.corepersistence.TestIndexModule;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.core.test.UseModules;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-import org.apache.usergrid.persistence.index.IndexEdge;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.CandidateResults;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.IndexFig;
 import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.index.SearchTypes;
 import org.apache.usergrid.persistence.index.impl.EsRunner;
 import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.apache.usergrid.persistence.index.impl.IndexRequest;
 import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.model.field.StringField;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
 import com.google.inject.Inject;
 
 import rx.Observable;
+import rx.schedulers.Schedulers;
 
 import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.createCollectionEdge;
 import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.getApplicationScope;
 import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
 
 
-@RunWith(EsRunner.class)
-@UseModules({ TestIndexModule.class })
+@RunWith( EsRunner.class )
+@UseModules( { TestIndexModule.class } )
 public class IndexServiceTest {
 
     @Inject
@@ -63,24 +77,34 @@ public class IndexServiceTest {
     @Inject
     public GraphManagerFactory graphManagerFactory;
 
+    @Inject
+    public EntityCollectionManagerFactory entityCollectionManagerFactory;
+
+    @Inject
+    public EntityIndexFactory entityIndexFactory;
+
+    @Inject
+    public  IndexFig indexFig;
+
     public GraphManager graphManager;
 
     public ApplicationScope applicationScope;
 
+
     @Before
-    public void setup(){
-        applicationScope = getApplicationScope( UUIDGenerator.newTimeUUID());
+    public void setup() {
+        applicationScope = getApplicationScope( UUIDGenerator.newTimeUUID() );
 
         graphManager = graphManagerFactory.createEdgeManager( applicationScope 
);
     }
 
 
     @Test
-    public void testSingleIndexFromSource(){
-        final Entity entity = new Entity( createId( "test" ), 
UUIDGenerator.newTimeUUID());
+    public void testSingleIndexFromSource() {
+        final Entity entity = new Entity( createId( "test" ), 
UUIDGenerator.newTimeUUID() );
         entity.setField( new StringField( "string", "foo" ) );
 
-        final Edge collectionEdge =  createCollectionEdge( 
applicationScope.getApplication(), "tests", entity.getId() );
+        final Edge collectionEdge = createCollectionEdge( 
applicationScope.getApplication(), "tests", entity.getId() );
 
         //write the edge
         graphManager.writeEdge( collectionEdge ).toBlocking().last();
@@ -91,22 +115,199 @@ public class IndexServiceTest {
 
 
         //real users should never call to blocking, we're not sure what we'll 
get
-        final IndexOperationMessage results =  indexed.toBlocking().last();
+        final IndexOperationMessage results = indexed.toBlocking().last();
 
         final Set<IndexRequest> indexRequests = results.getIndexRequests();
 
         //ensure our value made it to the index request
         final IndexRequest indexRequest = indexRequests.iterator().next();
 
-        assertNotNull(indexRequest);
+        assertNotNull( indexRequest );
+    }
+
+
+
+//    @Test( timeout = 60000 )
+    @Test( )
+    public void testSingleCollectionConnection() throws InterruptedException {
+
+
+        ApplicationScope applicationScope =
+            new ApplicationScopeImpl( new SimpleId( UUID.randomUUID(), 
"application" ) );
+
+
+        final Entity testEntity = new Entity( createId( "thing" ), 
UUIDGenerator.newTimeUUID() );
+        testEntity.setField( new StringField( "string", "foo" ) );
+
+
+        //write the entity before indexing
+        final EntityCollectionManager collectionManager =
+            entityCollectionManagerFactory.createCollectionManager( 
applicationScope );
+
+        collectionManager.write( testEntity ).toBlocking().last();
+
+        final GraphManager graphManager = 
graphManagerFactory.createEdgeManager( applicationScope );
+
+        //create our collection edge
+        final Edge collectionEdge =
+            CpNamingUtils.createCollectionEdge( 
applicationScope.getApplication(), "things", testEntity.getId() );
+        graphManager.writeEdge( collectionEdge ).toBlocking().last();
+
+
+
+        final Id connectingId = createId( "connecting" );
+        final Edge edge = CpNamingUtils.createConnectionEdge( connectingId, 
"likes", testEntity.getId() );
+
+
+        final Edge connectionSearch = graphManager.writeEdge( edge 
).toBlocking().last();
+
+
+
+
+        //now index
+        final int batches = indexService.indexEntity( applicationScope, 
testEntity ).count().toBlocking().last();
+
+
+        assertEquals(1, batches);
+
+        final ApplicationEntityIndex applicationEntityIndex =
+            entityIndexFactory.createApplicationEntityIndex( applicationScope 
);
+
+        final SearchEdge collectionSearchEdge = 
CpNamingUtils.createSearchEdgeFromSource( collectionEdge );
+
+        final CountDownLatch latch = new CountDownLatch( 1 );
+
+
+        //query until it's available
+        final CandidateResults collectionResults = getResults( 
applicationEntityIndex, collectionSearchEdge,
+            SearchTypes.fromTypes( testEntity.getId().getType() ), "select *", 
100, 1, 100 );
+
+        assertEquals( 1, collectionResults.size() );
+
+        assertEquals( testEntity.getId(), collectionResults.get( 0 ).getId() );
+
+        latch.await();
+
+        final SearchEdge connectionSearchEdge = 
CpNamingUtils.createSearchEdgeFromSource( connectionSearch );
+
 
+        //query until it's available
+        final CandidateResults connectionResults = getResults( 
applicationEntityIndex, connectionSearchEdge,
+            SearchTypes.fromTypes( testEntity.getId().getType() ), "select *", 
100, 1, 100 );
 
+        assertEquals( 1, connectionResults.size() );
 
-//        assertEquals(applicationScope.getApplication(), indexRequest.);
-//        assertEquals(collectionEdge.getTimestamp(), edge.getTimestamp());
-//        assertEquals(collectionEdge.getType(), edge.getEdgeName());
-//        assertEquals( SearchEdge.NodeType.TARGET, edge.getNodeType());
+        assertEquals( testEntity.getId(), connectionResults.get( 0 ).getId() );
     }
 
+    /**
+     * Tests that when we have large connections, we batch appropriately
+     * @throws InterruptedException
+     */
+//    @Test( timeout = 60000 )
+    @Test( )
+    public void testConnectingIndexingBatches() throws InterruptedException {
 
+
+        ApplicationScope applicationScope =
+            new ApplicationScopeImpl( new SimpleId( UUID.randomUUID(), 
"application" ) );
+
+
+        final Entity testEntity = new Entity( createId( "thing" ), 
UUIDGenerator.newTimeUUID() );
+        testEntity.setField( new StringField( "string", "foo" ) );
+
+
+        //write the entity before indexing
+        final EntityCollectionManager collectionManager =
+            entityCollectionManagerFactory.createCollectionManager( 
applicationScope );
+
+        collectionManager.write( testEntity ).toBlocking().last();
+
+        final GraphManager graphManager = 
graphManagerFactory.createEdgeManager( applicationScope );
+
+        //create our collection edge
+        final Edge collectionEdge =
+            CpNamingUtils.createCollectionEdge( 
applicationScope.getApplication(), "things", testEntity.getId() );
+        graphManager.writeEdge( collectionEdge ).toBlocking().last();
+
+
+        /**
+         * Write 10k edges 10 at a time in parallel
+         */
+
+        final int edgeCount = 2000;
+
+        final Edge connectionSearch = Observable.range( 0, edgeCount 
).flatMap( integer -> {
+            final Id connectingId = createId( "connecting" );
+            final Edge edge = CpNamingUtils.createConnectionEdge( 
connectingId, "likes", testEntity.getId() );
+
+            return graphManager.writeEdge( edge ).subscribeOn( Schedulers.io() 
);
+        }, 10 ).toBlocking().last();
+
+
+        //now index
+        final int batches = indexService.indexEntity( applicationScope, 
testEntity ).count().toBlocking().last();
+
+        //take our edge count + 1 and divided by batch sizes
+        final int expectedSize = ( int ) Math.ceil( ( (double)edgeCount + 1 ) 
/ indexFig.getIndexBatchSize() );
+
+        assertEquals(expectedSize, batches);
+
+        final ApplicationEntityIndex applicationEntityIndex =
+            entityIndexFactory.createApplicationEntityIndex( applicationScope 
);
+
+        final SearchEdge collectionSearchEdge = 
CpNamingUtils.createSearchEdgeFromSource( collectionEdge );
+
+        final CountDownLatch latch = new CountDownLatch( 1 );
+
+
+        //query until it's available
+        final CandidateResults collectionResults = getResults( 
applicationEntityIndex, collectionSearchEdge,
+            SearchTypes.fromTypes( testEntity.getId().getType() ), "select *", 
100, 1, 100 );
+
+        assertEquals( 1, collectionResults.size() );
+
+        assertEquals( testEntity.getId(), collectionResults.get( 0 ).getId() );
+
+        latch.await();
+
+        final SearchEdge connectionSearchEdge = 
CpNamingUtils.createSearchEdgeFromSource( connectionSearch );
+
+
+        //query until it's available
+        final CandidateResults connectionResults = getResults( 
applicationEntityIndex, connectionSearchEdge,
+            SearchTypes.fromTypes( testEntity.getId().getType() ), "select *", 
100, 1, 100 );
+
+        assertEquals( 1, connectionResults.size() );
+
+        assertEquals( testEntity.getId(), connectionResults.get( 0 ).getId() );
+    }
+
+
+    private CandidateResults getResults( final ApplicationEntityIndex 
applicationEntityIndex,
+                                         final SearchEdge searchEdge, final 
SearchTypes searchTypes, final String ql,
+                                         final int count, final int 
expectedSize, final int attempts ) {
+
+
+        for ( int i = 0; i < attempts; i++ ) {
+            final CandidateResults candidateResults =
+                applicationEntityIndex.search( searchEdge, searchTypes, 
"select *", 100 );
+
+            if ( candidateResults.size() == expectedSize ) {
+                return candidateResults;
+            }
+
+            try {
+                Thread.sleep( 100 );
+            }
+            catch ( InterruptedException e ) {
+                //swallow
+            }
+        }
+
+        fail( "Could not find candidates of size " + expectedSize + "after " + 
attempts + " attempts" );
+
+        //we'll never reach this, required for compile
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexServiceTest.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexServiceTest.java
 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexServiceTest.java
index 5e9a5a1..aa95ae8 100644
--- 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexServiceTest.java
+++ 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexServiceTest.java
@@ -20,146 +20,87 @@
 package org.apache.usergrid.corepersistence.index;
 
 
-import org.apache.usergrid.corepersistence.TestIndexModule;
-import org.apache.usergrid.persistence.index.impl.EsRunner;
+import java.util.UUID;
 
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
+import org.apache.usergrid.corepersistence.TestIndexModule;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule;
 import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.core.test.UseModules;
-import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.CandidateResults;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.index.SearchTypes;
+import org.apache.usergrid.persistence.index.impl.EsRunner;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.field.StringField;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.persistence.queue.QueueManagerFactory;
 
 import com.google.inject.Inject;
 
 import net.jcip.annotations.NotThreadSafe;
 
-import static org.junit.Assert.assertTrue;
+import rx.Observable;
+import rx.schedulers.Schedulers;
+
+import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeTrue;
 
 
-@RunWith(EsRunner.class)
-@UseModules({ TestIndexModule.class })
+@RunWith( EsRunner.class )
+@UseModules( { TestIndexModule.class } )
 @NotThreadSafe
-public class SQSAsyncIndexServiceTest {
-
+public class SQSAsyncIndexServiceTest extends AsyncIndexServiceTest {
 
-    @Inject
-    @Rule
-    public MigrationManagerRule migrationManagerRule;
 
 
     @Rule
     public NoAWSCredsRule noAwsCredsRule = new NoAWSCredsRule();
 
+
+
     @Inject
     public QueueManagerFactory queueManagerFactory;
 
     @Inject
-    public QueryFig queryFig;
+    public IndexProcessorFig indexProcessorFig;
 
 
     @Inject
     public MetricsFactory metricsFactory;
 
+    @Inject
+    public IndexService indexService;
 
-    private SQSAsyncReIndexService bufferQueueSQS;
-
-    @Before
-    public void setup(){
-        bufferQueueSQS = new SQSAsyncReIndexService( queueManagerFactory, 
queryFig, metricsFactory );
-    }
-
-
-
+    @Inject
+    public RxTaskScheduler rxTaskScheduler;
 
-    @Test
-    public void testMessageIndexing(){
-
-        fail("fix me");
-//        ApplicationScope applicationScope = new ApplicationScopeImpl(new 
SimpleId(UUID.randomUUID(),"application"));
-//        final UsergridAwsCredentialsProvider ugProvider = new 
UsergridAwsCredentialsProvider();
-//        assumeTrue( ugProvider.getCredentials().getAWSAccessKeyId() != null 
);
-//        assumeTrue( ugProvider.getCredentials().getAWSSecretKey() != null );
-//
-//        final Map<String, Object> request1Data  = new HashMap<String, 
Object>() {{put("test", "testval1");}};
-//        final IndexRequest indexRequest1 =  new IndexRequest( "testAlias1", 
"testDoc1",request1Data );
-//
-//
-//        final Map<String, Object> request2Data  = new HashMap<String, 
Object>() {{put("test", "testval2");}};
-//        final IndexRequest indexRequest2 =  new IndexRequest( "testAlias2", 
"testDoc2",request2Data );
-//
-//
-//        //de-index request
-//        final DeIndexRequest
-//            deIndexRequest1 = new DeIndexRequest( new String[]{"index1.1, 
index1.2"}, applicationScope, new SearchEdgeImpl(new 
SimpleId("testId3"),"name3",
-//
-//
-//                SearchEdge.NodeType.SOURCE ),  new SimpleId("id3"), 
UUID.randomUUID() );
-//
-//        final DeIndexRequest deIndexRequest2 = new DeIndexRequest( new 
String[]{"index2.1", "index2.1"}, applicationScope,  new SearchEdgeImpl(new 
SimpleId("testId4"),"name4",
-//                SearchEdge.NodeType.SOURCE ),  new SimpleId("id4"), 
UUID.randomUUID()  );
-//
-//
-//
-//
-//        IndexOperationMessage indexOperationMessage = new 
IndexOperationMessage();
-//        indexOperationMessage.addIndexRequest( indexRequest1);
-//        indexOperationMessage.addIndexRequest( indexRequest2);
-//
-//        indexOperationMessage.addDeIndexRequest( deIndexRequest1 );
-//        indexOperationMessage.addDeIndexRequest( deIndexRequest2 );
-//
-//        bufferQueueSQS.offer( indexOperationMessage );
-//
-//        //wait for it to send to SQS
-//        indexOperationMessage.getFuture().get();
-//
-//        //now get it back
-//
-//        final List<IndexOperationMessage> ops = getResults( 20, 
TimeUnit.SECONDS );
-//
-//        assertTrue(ops.size() > 0);
-//
-//        final IndexOperationMessage returnedOperation = ops.get( 0 );
-//
-//         //get the operations out
-//
-//        final Set<IndexRequest> indexRequestSet = 
returnedOperation.getIndexRequests();
-//
-//        assertTrue(indexRequestSet.contains(indexRequest1));
-//        assertTrue(indexRequestSet.contains(indexRequest2));
-//
-//
-//        final Set<DeIndexRequest> deIndexRequests = 
returnedOperation.getDeIndexRequests();
-//
-//        assertTrue( deIndexRequests.contains( deIndexRequest1 ) );
-//        assertTrue( deIndexRequests.contains( deIndexRequest2 ) );
-//
-//
-//
-//        //now ack the message
-//
-//        bufferQueueSQS.ack( ops );
 
+    @Override
+    protected AsyncIndexService getAsyncIndexService() {
+        return  new SQSAsyncIndexService( queueManagerFactory, 
indexProcessorFig, metricsFactory, indexService,
+                    entityCollectionManagerFactory, rxTaskScheduler );
     }
 
-//    private List<IndexOperationMessage> getResults(final long timeout, final 
TimeUnit timeUnit){
-//        final long endTime = System.currentTimeMillis() + timeUnit.toMillis( 
timeout );
-//
-//        List<IndexOperationMessage> ops;
-//
-//        do{
-//            ops = bufferQueueSQS.take( 10,  20, TimeUnit.SECONDS );
-//        }while((ops == null || ops.size() == 0 ) &&  
System.currentTimeMillis() < endTime);
-//
-//        return ops;
-//    }
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/EntityIdScope.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/EntityIdScope.java
 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/EntityIdScope.java
index b83b806..ccb235b 100644
--- 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/EntityIdScope.java
+++ 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/EntityIdScope.java
@@ -20,17 +20,31 @@
 package 
org.apache.usergrid.persistence.collection.serialization.impl.migration;
 
 
+import java.io.Serializable;
+
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+
 /**
  * Tuple containing collectionscope and entityid
  */
-public class EntityIdScope{
-    private final Id id;
-    private final ApplicationScope applicationScope;
+public class EntityIdScope implements Serializable {
+    private Id id;
+    private ApplicationScope applicationScope;
+
 
-    public EntityIdScope(ApplicationScope applicationScope, Id id){
+    /**
+     * DO NOT DELETE!!  Required for Jackson
+     */
+    @SuppressWarnings( "unused" )
+    public EntityIdScope() {
+    }
+
+
+    public EntityIdScope( ApplicationScope applicationScope, Id id ) {
         this.id = id;
         this.applicationScope = applicationScope;
     }
@@ -44,4 +58,22 @@ public class EntityIdScope{
     public ApplicationScope getApplicationScope() {
         return applicationScope;
     }
+
+
+    /**
+     * DO NOT DELETE!!  Required for Jackson
+     */
+    @SuppressWarnings( "unused" )
+    public void setApplicationScope( final ApplicationScope applicationScope ) 
{
+        this.applicationScope = applicationScope;
+    }
+
+
+    /**
+     * DO NOT DELETE!!  Required for Jackson
+     */
+    @SuppressWarnings( "unused" )
+    public void setId( final Id id ) {
+        this.id = id;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationRelationship.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationRelationship.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationRelationship.java
index 806768e..47ddf4d 100644
--- 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationRelationship.java
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationRelationship.java
@@ -59,13 +59,19 @@ public class MigrationRelationship<T extends VersionedData> 
{
 
 
     /**
-     * Return true if this is the migration relationship we should use.  The 
version matches the from
-     * and is <= the to
-     * @param currentVersion
-     * @return
+     * Return true if this is the migration relationship we should use.  The 
version matches the from and is <= the to
+     *
+     * @return The span.  Minimum span should be used.  Integer.MAX_VALUE 
means this span is unsupported.
      */
-    public boolean correctRelationship(final int currentVersion){
-        return currentVersion == fromVersion && currentVersion <= toVersion;
+    public int getSpan( final int currentVersion ) {
+
+        //current version is in our range, find it's delta from our min version
+        if ( currentVersion >= fromVersion && currentVersion <= toVersion ) {
+            //we return the fromVersion we're closest to.  Distance from 0 is 
what matters, so
+            return Math.abs( fromVersion - currentVersion );
+        }
+
+        return Integer.MAX_VALUE;
     }
 
 
@@ -97,4 +103,15 @@ public class MigrationRelationship<T extends VersionedData> 
{
         result = 31 * result + to.hashCode();
         return result;
     }
+
+
+    @Override
+    public String toString() {
+        return "MigrationRelationship{" +
+            "from=" + from +
+            ", to=" + to +
+            ", fromVersion=" + fromVersion +
+            ", toVersion=" + toVersion +
+            '}';
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/VersionedMigrationSet.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/VersionedMigrationSet.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/VersionedMigrationSet.java
index cadd919..dd0e979 100644
--- 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/VersionedMigrationSet.java
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/VersionedMigrationSet.java
@@ -83,35 +83,46 @@ public class VersionedMigrationSet<T extends VersionedData> 
{
     /**
      * Get the migration relationship based on our current version. This will 
return a range that includes the current
      * system version as the source, and the highest version we can roll to in 
the to field
-     * @param currentVersion
+     *
      * @return The MigrationRelationship.  Note the from and the to could be 
the same version in a current system.
      */
-    public MigrationRelationship<T> getMigrationRelationship( final int 
currentVersion ){
+    public MigrationRelationship<T> getMigrationRelationship( final int 
currentVersion ) {
 
         final MigrationRelationship<T> relationship = cacheVersion.get( 
currentVersion );
 
-        if(relationship != null){
+        if ( relationship != null ) {
             return relationship;
         }
 
-        //not there, find it.  Not the most efficient, but it happens once per 
version, which rarely changes, so not a big deal
+        //not there, find it.  Not the most efficient, but it happens once per 
version, which rarely changes, so not
+        // a big deal
 
+        int lastSpan = Integer.MAX_VALUE;
+        MigrationRelationship<T> toUse = null;
 
-        for(MigrationRelationship<T> current: orderedVersions){
+        for ( MigrationRelationship<T> current : orderedVersions ) {
 
             //not our instance, the from is too high
             //our from is this instance, so we support this tuple.  Our future 
is >= as well, so we can perform this I/O
-            if ( current.correctRelationship( currentVersion )) {
-                cacheVersion.put( currentVersion, current );
-                return current;
-            }
 
+            final int newSpan = current.getSpan( currentVersion );
+
+            if ( newSpan < lastSpan ) {
+                lastSpan = newSpan;
+                toUse = current;
+            }
         }
 
         //if we get here, something is wrong
-        throw new IllegalArgumentException( "Could not find a migration 
version for version " + currentVersion + " min found was " + 
orderedVersions.get( orderedVersions.size()-1 ) );
+        if ( lastSpan == Integer.MAX_VALUE ) {
+            throw new IllegalArgumentException(
+                "Could not find a migration version for version " + 
currentVersion + " min found was " + orderedVersions
+                    .get( orderedVersions.size() - 1 ) );
+        }
 
 
+        cacheVersion.put( currentVersion, toUse );
+        return toUse;
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
index 7986132..ac45967 100644
--- 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
@@ -35,14 +35,14 @@ public interface RxSchedulerFig extends GuicyFig {
      * Amount of time in milliseconds to wait when ES rejects our request 
before retrying.  Provides simple
      * backpressure
      */
-    public static final String IO_SCHEDULER_THREADS = "scheduler.io.threads";
+    String IO_SCHEDULER_THREADS = "scheduler.io.threads";
 
 
     /**
      * Amount of time in milliseconds to wait when ES rejects our request 
before retrying.  Provides simple
      * backpressure
      */
-    public static final String IO_SCHEDULER_NAME = "scheduler.io.poolName";
+    String IO_SCHEDULER_NAME = "scheduler.io.poolName";
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScope.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScope.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScope.java
index 5d22eff..aa879c9 100644
--- 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScope.java
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScope.java
@@ -22,11 +22,15 @@ package org.apache.usergrid.persistence.core.scope;
 import java.io.Serializable;
 
 import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 
 
 /**
  * A scope used for organizations
  */
+@JsonDeserialize(as = ApplicationScopeImpl.class)
 public interface ApplicationScope extends Serializable {
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScopeImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScopeImpl.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScopeImpl.java
index 5733e3a..45a50ba 100644
--- 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScopeImpl.java
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScopeImpl.java
@@ -31,9 +31,18 @@ import com.google.common.base.Preconditions;
  */
 public class ApplicationScopeImpl implements ApplicationScope {
 
-    protected final Id application;
+    protected Id application;
 
 
+
+    /**
+     * Do not delete!  Needed for Jackson
+     */
+    @SuppressWarnings( "unused" )
+    public ApplicationScopeImpl(){
+
+    }
+
     public ApplicationScopeImpl( final Id application ) {
         Preconditions.checkNotNull(application, "application id is required");
         this.application = application;
@@ -46,6 +55,16 @@ public class ApplicationScopeImpl implements 
ApplicationScope {
     }
 
 
+
+    /**
+     * Do not delete!  Needed for Jackson
+     */
+    @SuppressWarnings( "unused" )
+    public void setApplication( final Id application ) {
+        this.application = application;
+    }
+
+
     @Override
     public boolean equals( final Object o ) {
         if ( this == o ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/VersionedMigrationSetTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/VersionedMigrationSetTest.java
 
b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/VersionedMigrationSetTest.java
index fc23cb4..e498b07 100644
--- 
a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/VersionedMigrationSetTest.java
+++ 
b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/VersionedMigrationSetTest.java
@@ -114,17 +114,13 @@ public class VersionedMigrationSetTest {
                 new MigrationRelationship<>( new TestVersionImpl( 4 ), new 
TestVersionImpl( 6 ) );
 
 
-        final MigrationRelationship<TestVersionImpl> relationship5_6 =
-                new MigrationRelationship<>( new TestVersionImpl( 5 ), new 
TestVersionImpl( 6 ) );
-
-
         final MigrationRelationship<TestVersionImpl> current =
                 new MigrationRelationship<>( new TestVersionImpl( 6 ), new 
TestVersionImpl( 6 ) );
 
 
         final VersionedMigrationSet<TestVersionImpl> set =
                 new VersionedMigrationSet<>( relationship1_3, relationship2_3, 
relationship3_6, relationship4_6,
-                        relationship5_6, current );
+                        current );
 
 
         try {
@@ -156,7 +152,7 @@ public class VersionedMigrationSetTest {
 
         migrationRelationship = set.getMigrationRelationship( 5 );
 
-        assertEquals( relationship5_6, migrationRelationship );
+        assertEquals( relationship4_6, migrationRelationship );
 
         migrationRelationship = set.getMigrationRelationship( 6 );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/Id.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/Id.java
 
b/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/Id.java
index a371340..39014ab 100644
--- 
a/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/Id.java
+++ 
b/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/Id.java
@@ -22,11 +22,14 @@ package org.apache.usergrid.persistence.model.entity;
 import java.io.Serializable;
 import java.util.UUID;
 
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
 
 /**
  * Interface for creating identifiers for an entity. The implementation should 
implement
  * the equals and hasCode methods
  * @author tnine */
+@JsonDeserialize(as = SimpleId.class)
 public interface Id extends Comparable<Id>, Serializable {
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/SimpleId.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/SimpleId.java
 
b/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/SimpleId.java
index 7ad4ab4..018e90b 100644
--- 
a/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/SimpleId.java
+++ 
b/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/SimpleId.java
@@ -25,16 +25,29 @@ import java.util.UUID;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.persistence.model.util.Verify;
 
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.fasterxml.uuid.UUIDComparator;
 import com.google.common.base.Preconditions;
 
 
 /** @author tnine */
+@JsonTypeInfo( use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, 
property="@class" )
 public class SimpleId implements Id, Serializable {
 
 
-    private final UUID uuid;
-    private final String type;
+    private UUID uuid;
+    private String type;
+
+
+    /**
+     * Do not delete!  Needed for Jackson
+     */
+    @SuppressWarnings( "unused" )
+    public SimpleId(){
+
+    }
+
+
 
     public SimpleId( final UUID uuid, final String type ) {
         Preconditions.checkNotNull( uuid, "uuid is required" );
@@ -67,6 +80,26 @@ public class SimpleId implements Id, Serializable {
     }
 
 
+
+    /**
+     * Do not delete!  Needed for Jackson
+     */
+    @SuppressWarnings( "unused" )
+    public void setType( final String type ) {
+        this.type = type;
+    }
+
+
+
+    /**
+     * Do not delete!  Needed for Jackson
+     */
+    @SuppressWarnings( "unused" )
+    public void setUuid( final UUID uuid ) {
+        this.uuid = uuid;
+    }
+
+
     @Override
     public boolean equals( final Object o ) {
         if ( this == o ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
index 1a60026..ad9b91f 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
@@ -60,20 +60,12 @@ public class IndexOperationMessage implements Serializable {
     }
 
 
-    public void addAllIndexRequest( final Set<IndexRequest> indexRequests ) {
-        this.indexRequests.addAll( indexRequests );
-    }
-
 
     public void addDeIndexRequest( final DeIndexRequest deIndexRequest ) {
         this.deIndexRequests.add( deIndexRequest );
     }
 
 
-    public void addAllDeIndexRequest( final Set<DeIndexRequest> 
deIndexRequests ) {
-        this.deIndexRequests.addAll( deIndexRequests );
-    }
-
 
     public Set<IndexRequest> getIndexRequests() {
         return indexRequests;

Reply via email to