Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o a143ddfea -> 43b0ba64a


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
index fb4800c..beaeab4 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
@@ -20,6 +20,7 @@ package org.apache.usergrid.corepersistence;
 import com.google.inject.AbstractModule;
 import com.google.inject.multibindings.Multibinder;
 
+import 
org.apache.usergrid.corepersistence.migration.EntityTypeMappingMigration;
 import 
org.apache.usergrid.corepersistence.migration.GraphShardVersionMigration;
 import org.apache.usergrid.persistence.collection.guice.CollectionModule;
 import org.apache.usergrid.persistence.core.guice.CommonModule;
@@ -52,6 +53,7 @@ public class GuiceModule  extends AbstractModule {
         bind(CpEntityIndexDeleteListener.class).asEagerSingleton();
 
         Multibinder<DataMigration> dataMigrationMultibinder = 
Multibinder.newSetBinder( binder(), DataMigration.class );
+        dataMigrationMultibinder.addBinding().to( 
EntityTypeMappingMigration.class );
         dataMigrationMultibinder.addBinding().to( 
GraphShardVersionMigration.class );
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/corepersistence/NamingUtils.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/NamingUtils.java 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/NamingUtils.java
index 9f4d4d8..580d128 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/NamingUtils.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/NamingUtils.java
@@ -45,6 +45,11 @@ public class NamingUtils {
     public static final  UUID DEFAULT_APPLICATION_ID =
             UUID.fromString("b6768a08-b5d5-11e3-a495-11ddb1de66c9");
 
+    /**
+     * The name of the map that holds our entity id->type mapping
+     */
+    public static String TYPES_BY_UUID_MAP = "zzz_typesbyuuid_zzz";
+
 
     /**
      * Get the application scope from the given uuid

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
new file mode 100644
index 0000000..36d2e60
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.migration;
+
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.NamingUtils;
+import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.map.MapScope;
+import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
+
+import com.google.inject.Inject;
+
+import rx.functions.Action1;
+
+
+/**
+ * Migration to ensure that our entity id is written into our map data
+ */
+public class EntityTypeMappingMigration implements DataMigration {
+
+
+    private static final Logger logger = LoggerFactory.getLogger( 
EntityTypeMappingMigration.class );
+
+    private final GraphManagerFactory graphManagerFactory;
+    private final MapManagerFactory mapManagerFactory;
+
+
+    @Inject
+    public EntityTypeMappingMigration( final GraphManagerFactory 
graphManagerFactory,
+                                       final MapManagerFactory 
mapManagerFactory ) {
+        this.graphManagerFactory = graphManagerFactory;
+        this.mapManagerFactory = mapManagerFactory;
+    }
+
+
+    @Override
+    public void migrate( final ProgressObserver observer ) throws Throwable {
+
+        final AtomicLong atomicLong = new AtomicLong();
+
+        AllEntitiesInSystemObservable.getAllEntitiesInSystem( 
graphManagerFactory )
+                                     .doOnNext( new 
Action1<AllEntitiesInSystemObservable.EntityData>() {
+
+
+                                         @Override
+                                         public void call( final 
AllEntitiesInSystemObservable.EntityData entityData ) {
+
+                                             final MapScope ms = new 
MapScopeImpl( entityData.entityId,
+                                                     
NamingUtils.TYPES_BY_UUID_MAP );
+
+
+                                             final MapManager mapManager = 
mapManagerFactory.createMapManager( ms );
+
+                                             final UUID entityUuid = 
entityData.entityId.getUuid();
+                                             final String entityType = 
entityData.entityId.getType();
+
+                                             mapManager.putString( 
entityUuid.toString(), entityType );
+
+                                             if ( atomicLong.incrementAndGet() 
% 100 == 0 ) {
+                                                 updateStatus( atomicLong, 
observer );
+                                             }
+                                         }
+                                     } ).toBlocking().lastOrDefault( null );
+    }
+
+
+    private void updateStatus( final AtomicLong counter, final 
ProgressObserver observer ) {
+
+        observer.update( getVersion(), String.format( "Updated %d entities", 
counter.get() ) );
+    }
+
+
+    @Override
+    public int getVersion() {
+        return Versions.VERSION_1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
index 7c26f0c..c2573e4 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
@@ -1,21 +1,21 @@
 /*
  *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing,
- *  * software distributed under the License is distributed on an
- *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  * KIND, either express or implied.  See the License for the
- *  * specific language governing permissions and limitations
- *  * under the License.
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
  *
  */
 
@@ -23,24 +23,25 @@ package org.apache.usergrid.corepersistence.migration;
 
 
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.usergrid.corepersistence.rx.ApplicationObservable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
 import org.apache.usergrid.corepersistence.rx.EdgesFromSourceObservable;
-import org.apache.usergrid.corepersistence.rx.TargetIdObservable;
 import org.apache.usergrid.persistence.core.guice.CurrentImpl;
-import org.apache.usergrid.persistence.core.guice.PreviousImpl;
 import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import 
org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
-import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
 import rx.functions.Action1;
@@ -53,73 +54,80 @@ import rx.functions.Func1;
 public class GraphShardVersionMigration implements DataMigration {
 
 
-    private final EdgeMetadataSerialization v1Serialization;
+    private static final Logger logger = LoggerFactory.getLogger( 
GraphShardVersionMigration.class );
 
 
     private final EdgeMetadataSerialization v2Serialization;
 
     private final GraphManagerFactory graphManagerFactory;
+    private final Keyspace keyspace;
 
 
     @Inject
-    public GraphShardVersionMigration( @PreviousImpl final 
EdgeMetadataSerialization v1Serialization,
-                                       @CurrentImpl final 
EdgeMetadataSerialization v2Serialization,
-                                       final GraphManagerFactory 
graphManagerFactory ) {
-        this.v1Serialization = v1Serialization;
+    public GraphShardVersionMigration( @CurrentImpl final 
EdgeMetadataSerialization v2Serialization,
+                                       final GraphManagerFactory 
graphManagerFactory, final Keyspace keyspace ) {
         this.v2Serialization = v2Serialization;
         this.graphManagerFactory = graphManagerFactory;
+        this.keyspace = keyspace;
     }
 
 
     @Override
     public void migrate( final ProgressObserver observer ) throws Throwable {
 
-//    TODO, finish this
-// get each applicationid in our system
-//        Observable.create( new ApplicationObservable( graphManagerFactory ) 
) .doOnNext( new Action1<Id>() {
-//                    @Override
-//                    public void call( final Id id ) {
-//
-//                        //set up our application scope and graph manager
-//                        final ApplicationScope applicationScope = new 
ApplicationScopeImpl( id );
-//
-//
-//                        final GraphManager gm = 
graphManagerFactory.createEdgeManager( applicationScope );
-//
-//
-//                        //load all nodes that are targets of our application 
node.  I.E. entities that have been saved
-//                        final Observable<Edge> entityNodes = 
Observable.create( new EdgesFromSourceObservable( applicationScope, id, gm ) );
-//
-//                        //create our application node
-//                        final Observable<Id> applicationNode = 
Observable.just( id );
-//
-//                        //merge both the specified application node and the 
entity node so they all get used
-//                        Observable.merge( applicationNode, entityNodes 
).doOnNext( new Action1<Id>() {
-//                            //load all meta types from and to-and re-save 
them
-//
-//
-//                            @Override
-//                            public void call( final Id id ) {
-//                                //get the edge types from the source, buffer 
them, then re-save them.  This implicity
-//                                // updates target edges as well
-//                                gm.loadEdgesFromSource( new 
SimpleSearchByEdgeType()
-//                                new SimpleSearchEdgeType( id, null, null 
)).buffer( 1000 ).doOnNext(
-//
-//                                        new Action1<List<String>>() {
-//                                            @Override
-//                                            public void call( final 
List<String> strings ) {
-//                                                v2Serialization.writeEdge( 
applicationScope, )
-//                                            }
-//                                        } )
-//                            }
-//                        } );
-//                    }
-//                } );
+        final AtomicLong counter = new AtomicLong();
+
+        AllEntitiesInSystemObservable.getAllEntitiesInSystem( 
graphManagerFactory ).flatMap(
+                new Func1<AllEntitiesInSystemObservable.EntityData, 
Observable<List<Edge>>>() {
+
+
+                    @Override
+                    public Observable<List<Edge>> call( final 
AllEntitiesInSystemObservable.EntityData entityData ) {
+                        logger.info( "Migrating edges from node {} in scope 
{}", entityData.entityId,
+                                entityData.applicationScope );
+
+                        final GraphManager gm = 
graphManagerFactory.createEdgeManager( entityData.applicationScope );
+
+                        //get each edge from this node as a source
+                        return EdgesFromSourceObservable.edgesFromSource( 
entityData.applicationScope,
+                                entityData.entityId, gm )
+
+                                //for each edge, re-index it in v2  every 1000 
edges or less
+                                .buffer( 1000 ).doOnNext( new 
Action1<List<Edge>>() {
+                                    @Override
+                                    public void call( final List<Edge> edges ) 
{
+
+                                        final MutationBatch batch = 
keyspace.prepareMutationBatch();
+
+                                        for ( final Edge edge : edges ) {
+                                            logger.info( "Migrating meta for 
edge {}", edge );
+                                            final MutationBatch edgeBatch =
+                                                    v2Serialization.writeEdge( 
entityData.applicationScope, edge );
+                                            batch.mergeShallow( edgeBatch );
+                                        }
+
+                                        try {
+                                            batch.execute();
+                                        }
+                                        catch ( ConnectionException e ) {
+                                            throw new RuntimeException( 
"Unable to perform migration", e );
+                                        }
+
+                                        //update the observer so the admin can 
see it
+                                        final long newCount = 
counter.addAndGet( edges.size() );
+
+                                        observer.update( getVersion(), 
String.format("Finished re-writing %d edges", newCount) );
+
+
+                                    }
+                                } );
+                    }
+                } ).toBlocking().lastOrDefault( null );
     }
 
 
     @Override
     public int getVersion() {
-        return Versions.VERSION_1;
+        return Versions.VERSION_2;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java
index e9ee517..b4fe095 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java
@@ -31,7 +31,12 @@ import 
org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSeri
 public class Versions {
 
     /**
-     * Version 1
+     * Version 1 of our mappings
      */
-    public static final int VERSION_1 = 
EdgeMetadataSerializationProxyImpl.MIGRATION_VERSION;
+    public static final int VERSION_1 = 1;
+
+    /**
+     * Version 2.  Edge meta changes
+     */
+    public static final int VERSION_2 = 
EdgeMetadataSerializationProxyImpl.MIGRATION_VERSION;
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java
new file mode 100644
index 0000000..dcd6c89
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.rx;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+import rx.functions.Func1;
+
+
+/**
+ * An observable that will emit every entity Id stored in our entire system 
across all apps.
+ * Note that this only walks each application id graph, and emits edges from 
the applicationId and it's edges as the s
+ * source node
+ */
+public class AllEntitiesInSystemObservable {
+
+
+    /**
+     * Return an observable that emits all entities in the system.
+     */
+    public static Observable<EntityData> getAllEntitiesInSystem( final 
GraphManagerFactory graphManagerFactory ) {
+        //traverse all nodes in the graph, load all source edges from them, 
then re-save the meta data
+        return ApplicationObservable.getAllApplicationIds( graphManagerFactory 
)
+
+                                    .flatMap( new Func1<Id, 
Observable<EntityData>>() {
+                                        @Override
+                                        public Observable<EntityData> call( 
final Id id ) {
+
+                                            //set up our application scope and 
graph manager
+                                            final ApplicationScope 
applicationScope = new ApplicationScopeImpl( id );
+
+
+                                            final GraphManager gm =
+                                                    
graphManagerFactory.createEdgeManager( applicationScope );
+
+
+                                            //load all nodes that are targets 
of our application node.  I.E.
+                                            // entities that have been saved
+                                            final Observable<Id> entityNodes =
+                                                    
TargetIdObservable.getTargetNodes( applicationScope, id, gm );
+
+                                            //create our application node
+                                            final Observable<Id> 
applicationNode = Observable.just( id );
+
+                                            //merge both the specified 
application node and the entity node
+                                            // so they all get used
+                                            return Observable.merge( 
applicationNode, entityNodes )
+                                                             .map( new 
Func1<Id, EntityData>() {
+                                                                 @Override
+                                                                 public 
EntityData call( final Id id ) {
+                                                                     return 
new EntityData( applicationScope, id );
+                                                                 }
+                                                             } );
+                                        }
+                                    } );
+    }
+
+
+    /**
+     * Get the entity data.  Immutable bean for fast access
+     */
+    public static final class EntityData {
+        public final ApplicationScope applicationScope;
+        public final Id entityId;
+
+
+        public EntityData( final ApplicationScope applicationScope, final Id 
entityId ) {
+            this.applicationScope = applicationScope;
+            this.entityId = entityId;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
index 8ead71c..93ba9f0 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.corepersistence.rx;
 
 
+import java.util.Arrays;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,6 +38,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import rx.Observable;
 import rx.Subscriber;
 import rx.functions.Action1;
+import rx.functions.Func1;
 
 import static 
org.apache.usergrid.corepersistence.NamingUtils.generateApplicationId;
 import static 
org.apache.usergrid.corepersistence.NamingUtils.getApplicationScope;
@@ -44,27 +47,26 @@ import static 
org.apache.usergrid.corepersistence.NamingUtils.getApplicationScop
 /**
  * An observable that will emit all application stored in the system.
  */
-public class ApplicationObservable implements Observable.OnSubscribe<Id> {
+public class ApplicationObservable  {
 
-    private static final Logger logger = LoggerFactory.getLogger( 
ApplicationObservable.class );
 
-    private final GraphManagerFactory graphManagerFactory;
 
+    /**
+     * Get all applicationIds as an observable
+     * @param graphManagerFactory
+     * @return
+     */
+    public static Observable<Id> getAllApplicationIds( final 
GraphManagerFactory graphManagerFactory ) {
 
-    public ApplicationObservable( final GraphManagerFactory 
graphManagerFactory ) {
-        this.graphManagerFactory = graphManagerFactory;
-    }
+        //emit our 3 hard coded applications that are used the manage the 
system first.
+        //this way consumers can perform whatever work they need to on the 
root system first
 
 
-    @Override
-    public void call( final Subscriber<? super Id> subscriber ) {
+       final Observable<Id> systemIds =  Observable.from( Arrays.asList( 
generateApplicationId( NamingUtils.DEFAULT_APPLICATION_ID ),
+                generateApplicationId( NamingUtils.MANAGEMENT_APPLICATION_ID ),
+                generateApplicationId( NamingUtils.SYSTEM_APP_ID ) ) );
 
 
-        //emit our 3 hard coded applications that are used the manage the 
system first.
-        //this way consumers can perform whatever work they need to on the 
root system first
-        emit( generateApplicationId( NamingUtils.DEFAULT_APPLICATION_ID ), 
subscriber );
-        emit( generateApplicationId( NamingUtils.MANAGEMENT_APPLICATION_ID ), 
subscriber );
-        emit( generateApplicationId( NamingUtils.SYSTEM_APP_ID ), subscriber );
 
 
         ApplicationScope appScope = getApplicationScope( 
NamingUtils.SYSTEM_APP_ID );
@@ -75,45 +77,17 @@ public class ApplicationObservable implements 
Observable.OnSubscribe<Id> {
         Id rootAppId = appScope.getApplication();
 
 
-        Observable<Edge> edges = gm.loadEdgesFromSource(
+        Observable<Id> appIds = gm.loadEdgesFromSource(
                 new SimpleSearchByEdgeType( rootAppId, edgeType, 
Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
-                        null ) );
-
-
-        final int count = edges.doOnNext( new Action1<Edge>() {
+                        null ) ).map( new Func1<Edge, Id>() {
             @Override
-            public void call( final Edge edge ) {
-                Id applicationId = edge.getTargetNode();
-
-
-                logger.debug( "Emitting applicationId of {}", applicationId );
-
-                emit( applicationId, subscriber );
+            public Id call( final Edge edge ) {
+                return edge.getTargetNode();
             }
-        } )
-                //if we don't want the count, not sure we need to block.  We 
may just need to subscribe
-                .count().toBlocking().last();
+        } );
 
-        logger.debug( "Emitted {} application ids", count );
+        return Observable.merge( systemIds, appIds);
     }
 
 
-    /**
-     * Return false if no more items should be emitted, true otherwise
-     */
-    private boolean emit( final Id appId, final Subscriber<? super Id> 
subscriber ) {
-
-        if ( subscriber.isUnsubscribed() ) {
-            return false;
-        }
-
-        try {
-            subscriber.onNext( appId );
-        }
-        catch ( Throwable t ) {
-            subscriber.onError( t );
-        }
-
-        return true;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java
index b986d56..88efc7b 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java
@@ -32,42 +32,29 @@ import 
org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import rx.Observable;
-import rx.Subscriber;
 import rx.functions.Func1;
 
 
 /**
  * Emits the id of all nodes that are target nodes for the given source node
  */
-public class EdgesFromSourceObservable implements Observable.OnSubscribe<Edge> 
{
+public class EdgesFromSourceObservable {
 
     private static final Logger logger = LoggerFactory.getLogger( 
EdgesFromSourceObservable.class );
 
-    private final ApplicationScope applicationScope;
-    private final Id sourceNode;
-    private final GraphManager gm;
-
-
-    public EdgesFromSourceObservable( final ApplicationScope applicationScope, 
final Id sourceNode,
-                                      final GraphManager gm ) {
-        this.applicationScope = applicationScope;
-        this.sourceNode = sourceNode;
-        this.gm = gm;
-    }
-
-
-    @Override
-    public void call( final Subscriber<? super Edge> subscriber ) {
-
 
+    /**
+     * Get all edges from the source
+     */
+    public static Observable<Edge> edgesFromSource( final ApplicationScope 
applicationScope, final Id sourceNode,
+                                                    final GraphManager gm ) {
         final Id applicationId = applicationScope.getApplication();
         //only search edge types that start with collections
 
 
-        Observable<String> edgeTypes = gm.getEdgeTypesFromSource(
-                new SimpleSearchEdgeType( sourceNode, null, null ) );
+        Observable<String> edgeTypes = gm.getEdgeTypesFromSource( new 
SimpleSearchEdgeType( sourceNode, null, null ) );
 
-        edgeTypes.flatMap( new Func1<String, Observable<Edge>>() {
+        return edgeTypes.flatMap( new Func1<String, Observable<Edge>>() {
             @Override
             public Observable<Edge> call( final String edgeType ) {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java
index bd5e12f..91ba741 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java
@@ -29,47 +29,39 @@ import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import rx.Observable;
-import rx.Subscriber;
-import rx.functions.Action1;
+import rx.functions.Func1;
 
 
 /**
  * Emits the id of all nodes that are target nodes from the given source node
  */
-public class TargetIdObservable implements Observable.OnSubscribe<Id> {
+public class TargetIdObservable {
 
     private static final Logger logger = LoggerFactory.getLogger( 
TargetIdObservable.class );
 
-    private final ApplicationScope applicationScope;
-    private final Id sourceNode;
-    private final GraphManager gm;
-
-
-    public TargetIdObservable( final ApplicationScope applicationScope, final 
Id sourceNode, final GraphManager gm ) {
-        this.applicationScope = applicationScope;
-        this.sourceNode = sourceNode;
-        this.gm = gm;
-    }
-
-
-    @Override
-    public void call( final Subscriber<? super Id> subscriber ) {
 
+    /**
+     * Get all nodes that are target nodes from the sourceNode
+     * @param applicationScope
+     * @param sourceNode
+     * @param gm
+     * @return
+     */
+    public static Observable<Id> getTargetNodes(final ApplicationScope 
applicationScope, final Id sourceNode, final GraphManager gm) {
 
         //only search edge types that start with collections
-        Observable.create( new EdgesFromSourceObservable( applicationScope, 
sourceNode, gm ) )
-                  .doOnNext( new Action1<Edge>() {
+       return EdgesFromSourceObservable.edgesFromSource( applicationScope, 
sourceNode, gm ).map( new Func1<Edge, Id>() {
 
-                      @Override
-                      public void call( Edge edge ) {
 
-                          logger.info( "Emitting targetId of  {}", edge );
+           @Override
+           public Id call( final Edge edge ) {
+               final Id targetNode = edge.getTargetNode();
 
-                          final Id targetNode = edge.getTargetNode();
+               logger.info( "Emitting targetId of {}", edge );
 
 
-                          subscriber.onNext( targetNode );
-                      }
-                  } ).toBlocking().lastOrDefault( null ); // end foreach on 
edges
+               return targetNode;
+           }
+       } );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java 
b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
index a8bd363..6e571d1 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
@@ -579,6 +579,9 @@ public interface EntityManager {
             String entityType, String propertyName, Object propertyValue ) 
throws Exception;
 
     @Deprecated
+    /**
+     * Get an entity by UUID.  This will return null if the entity is not found
+     */
     public Entity get( UUID id ) throws Exception;
 
     public <A extends Entity> A get( EntityRef entityRef, Class<A> entityClass 
) throws Exception;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java
index 203fa38..b3aeda7 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java
@@ -46,7 +46,7 @@ import com.netflix.astyanax.MutationBatch;
 @Singleton
 public class EdgeMetadataSerializationProxyImpl implements 
EdgeMetadataSerialization {
 
-    public static final int MIGRATION_VERSION = 1;
+    public static final int MIGRATION_VERSION = 2;
 
     private final DataMigrationManager dataMigrationManager;
     private final Keyspace keyspace;

Reply via email to