Repository: incubator-usergrid Updated Branches: refs/heads/two-dot-o a143ddfea -> 43b0ba64a
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java index fb4800c..beaeab4 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java @@ -20,6 +20,7 @@ package org.apache.usergrid.corepersistence; import com.google.inject.AbstractModule; import com.google.inject.multibindings.Multibinder; +import org.apache.usergrid.corepersistence.migration.EntityTypeMappingMigration; import org.apache.usergrid.corepersistence.migration.GraphShardVersionMigration; import org.apache.usergrid.persistence.collection.guice.CollectionModule; import org.apache.usergrid.persistence.core.guice.CommonModule; @@ -52,6 +53,7 @@ public class GuiceModule extends AbstractModule { bind(CpEntityIndexDeleteListener.class).asEagerSingleton(); Multibinder<DataMigration> dataMigrationMultibinder = Multibinder.newSetBinder( binder(), DataMigration.class ); + dataMigrationMultibinder.addBinding().to( EntityTypeMappingMigration.class ); dataMigrationMultibinder.addBinding().to( GraphShardVersionMigration.class ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/corepersistence/NamingUtils.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/NamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/NamingUtils.java index 9f4d4d8..580d128 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/NamingUtils.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/NamingUtils.java @@ -45,6 +45,11 @@ public class NamingUtils { public static final UUID DEFAULT_APPLICATION_ID = UUID.fromString("b6768a08-b5d5-11e3-a495-11ddb1de66c9"); + /** + * The name of the map that holds our entity id->type mapping + */ + public static String TYPES_BY_UUID_MAP = "zzz_typesbyuuid_zzz"; + /** * Get the application scope from the given uuid http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java new file mode 100644 index 0000000..36d2e60 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.migration; + + +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.corepersistence.NamingUtils; +import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable; +import org.apache.usergrid.persistence.core.migration.data.DataMigration; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.map.MapManager; +import org.apache.usergrid.persistence.map.MapManagerFactory; +import org.apache.usergrid.persistence.map.MapScope; +import org.apache.usergrid.persistence.map.impl.MapScopeImpl; + +import com.google.inject.Inject; + +import rx.functions.Action1; + + +/** + * Migration to ensure that our entity id is written into our map data + */ +public class EntityTypeMappingMigration implements DataMigration { + + + private static final Logger logger = LoggerFactory.getLogger( EntityTypeMappingMigration.class ); + + private final GraphManagerFactory graphManagerFactory; + private final MapManagerFactory mapManagerFactory; + + + @Inject + public EntityTypeMappingMigration( final GraphManagerFactory graphManagerFactory, + final MapManagerFactory mapManagerFactory ) { + this.graphManagerFactory = graphManagerFactory; + this.mapManagerFactory = mapManagerFactory; + } + + + @Override + public void migrate( final ProgressObserver observer ) throws Throwable { + + final AtomicLong atomicLong = new AtomicLong(); + + AllEntitiesInSystemObservable.getAllEntitiesInSystem( graphManagerFactory ) + .doOnNext( new Action1<AllEntitiesInSystemObservable.EntityData>() { + + + @Override + public void call( final AllEntitiesInSystemObservable.EntityData entityData ) { + + final MapScope ms = new MapScopeImpl( entityData.entityId, + NamingUtils.TYPES_BY_UUID_MAP ); + + + final MapManager mapManager = mapManagerFactory.createMapManager( ms ); + + final UUID entityUuid = entityData.entityId.getUuid(); + final String entityType = entityData.entityId.getType(); + + mapManager.putString( entityUuid.toString(), entityType ); + + if ( atomicLong.incrementAndGet() % 100 == 0 ) { + updateStatus( atomicLong, observer ); + } + } + } ).toBlocking().lastOrDefault( null ); + } + + + private void updateStatus( final AtomicLong counter, final ProgressObserver observer ) { + + observer.update( getVersion(), String.format( "Updated %d entities", counter.get() ) ); + } + + + @Override + public int getVersion() { + return Versions.VERSION_1; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java index 7c26f0c..c2573e4 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java @@ -1,21 +1,21 @@ /* * - * * Licensed to the Apache Software Foundation (ASF) under one - * * or more contributor license agreements. See the NOTICE file - * * distributed with this work for additional information - * * regarding copyright ownership. The ASF licenses this file - * * to you under the Apache License, Version 2.0 (the - * * "License"); you may not use this file except in compliance - * * with the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, - * * software distributed under the License is distributed on an - * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * * KIND, either express or implied. See the License for the - * * specific language governing permissions and limitations - * * under the License. +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. * */ @@ -23,24 +23,25 @@ package org.apache.usergrid.corepersistence.migration; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; -import org.apache.usergrid.corepersistence.rx.ApplicationObservable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable; import org.apache.usergrid.corepersistence.rx.EdgesFromSourceObservable; -import org.apache.usergrid.corepersistence.rx.TargetIdObservable; import org.apache.usergrid.persistence.core.guice.CurrentImpl; -import org.apache.usergrid.persistence.core.guice.PreviousImpl; import org.apache.usergrid.persistence.core.migration.data.DataMigration; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.GraphManagerFactory; -import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; -import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType; import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization; -import org.apache.usergrid.persistence.model.entity.Id; import com.google.inject.Inject; +import com.netflix.astyanax.Keyspace; +import com.netflix.astyanax.MutationBatch; +import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; import rx.Observable; import rx.functions.Action1; @@ -53,73 +54,80 @@ import rx.functions.Func1; public class GraphShardVersionMigration implements DataMigration { - private final EdgeMetadataSerialization v1Serialization; + private static final Logger logger = LoggerFactory.getLogger( GraphShardVersionMigration.class ); private final EdgeMetadataSerialization v2Serialization; private final GraphManagerFactory graphManagerFactory; + private final Keyspace keyspace; @Inject - public GraphShardVersionMigration( @PreviousImpl final EdgeMetadataSerialization v1Serialization, - @CurrentImpl final EdgeMetadataSerialization v2Serialization, - final GraphManagerFactory graphManagerFactory ) { - this.v1Serialization = v1Serialization; + public GraphShardVersionMigration( @CurrentImpl final EdgeMetadataSerialization v2Serialization, + final GraphManagerFactory graphManagerFactory, final Keyspace keyspace ) { this.v2Serialization = v2Serialization; this.graphManagerFactory = graphManagerFactory; + this.keyspace = keyspace; } @Override public void migrate( final ProgressObserver observer ) throws Throwable { -// TODO, finish this -// get each applicationid in our system -// Observable.create( new ApplicationObservable( graphManagerFactory ) ) .doOnNext( new Action1<Id>() { -// @Override -// public void call( final Id id ) { -// -// //set up our application scope and graph manager -// final ApplicationScope applicationScope = new ApplicationScopeImpl( id ); -// -// -// final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope ); -// -// -// //load all nodes that are targets of our application node. I.E. entities that have been saved -// final Observable<Edge> entityNodes = Observable.create( new EdgesFromSourceObservable( applicationScope, id, gm ) ); -// -// //create our application node -// final Observable<Id> applicationNode = Observable.just( id ); -// -// //merge both the specified application node and the entity node so they all get used -// Observable.merge( applicationNode, entityNodes ).doOnNext( new Action1<Id>() { -// //load all meta types from and to-and re-save them -// -// -// @Override -// public void call( final Id id ) { -// //get the edge types from the source, buffer them, then re-save them. This implicity -// // updates target edges as well -// gm.loadEdgesFromSource( new SimpleSearchByEdgeType() -// new SimpleSearchEdgeType( id, null, null )).buffer( 1000 ).doOnNext( -// -// new Action1<List<String>>() { -// @Override -// public void call( final List<String> strings ) { -// v2Serialization.writeEdge( applicationScope, ) -// } -// } ) -// } -// } ); -// } -// } ); + final AtomicLong counter = new AtomicLong(); + + AllEntitiesInSystemObservable.getAllEntitiesInSystem( graphManagerFactory ).flatMap( + new Func1<AllEntitiesInSystemObservable.EntityData, Observable<List<Edge>>>() { + + + @Override + public Observable<List<Edge>> call( final AllEntitiesInSystemObservable.EntityData entityData ) { + logger.info( "Migrating edges from node {} in scope {}", entityData.entityId, + entityData.applicationScope ); + + final GraphManager gm = graphManagerFactory.createEdgeManager( entityData.applicationScope ); + + //get each edge from this node as a source + return EdgesFromSourceObservable.edgesFromSource( entityData.applicationScope, + entityData.entityId, gm ) + + //for each edge, re-index it in v2 every 1000 edges or less + .buffer( 1000 ).doOnNext( new Action1<List<Edge>>() { + @Override + public void call( final List<Edge> edges ) { + + final MutationBatch batch = keyspace.prepareMutationBatch(); + + for ( final Edge edge : edges ) { + logger.info( "Migrating meta for edge {}", edge ); + final MutationBatch edgeBatch = + v2Serialization.writeEdge( entityData.applicationScope, edge ); + batch.mergeShallow( edgeBatch ); + } + + try { + batch.execute(); + } + catch ( ConnectionException e ) { + throw new RuntimeException( "Unable to perform migration", e ); + } + + //update the observer so the admin can see it + final long newCount = counter.addAndGet( edges.size() ); + + observer.update( getVersion(), String.format("Finished re-writing %d edges", newCount) ); + + + } + } ); + } + } ).toBlocking().lastOrDefault( null ); } @Override public int getVersion() { - return Versions.VERSION_1; + return Versions.VERSION_2; } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java index e9ee517..b4fe095 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java @@ -31,7 +31,12 @@ import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSeri public class Versions { /** - * Version 1 + * Version 1 of our mappings */ - public static final int VERSION_1 = EdgeMetadataSerializationProxyImpl.MIGRATION_VERSION; + public static final int VERSION_1 = 1; + + /** + * Version 2. Edge meta changes + */ + public static final int VERSION_2 = EdgeMetadataSerializationProxyImpl.MIGRATION_VERSION; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java new file mode 100644 index 0000000..dcd6c89 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.rx; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; +import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.model.entity.Id; + +import rx.Observable; +import rx.functions.Func1; + + +/** + * An observable that will emit every entity Id stored in our entire system across all apps. + * Note that this only walks each application id graph, and emits edges from the applicationId and it's edges as the s + * source node + */ +public class AllEntitiesInSystemObservable { + + + /** + * Return an observable that emits all entities in the system. + */ + public static Observable<EntityData> getAllEntitiesInSystem( final GraphManagerFactory graphManagerFactory ) { + //traverse all nodes in the graph, load all source edges from them, then re-save the meta data + return ApplicationObservable.getAllApplicationIds( graphManagerFactory ) + + .flatMap( new Func1<Id, Observable<EntityData>>() { + @Override + public Observable<EntityData> call( final Id id ) { + + //set up our application scope and graph manager + final ApplicationScope applicationScope = new ApplicationScopeImpl( id ); + + + final GraphManager gm = + graphManagerFactory.createEdgeManager( applicationScope ); + + + //load all nodes that are targets of our application node. I.E. + // entities that have been saved + final Observable<Id> entityNodes = + TargetIdObservable.getTargetNodes( applicationScope, id, gm ); + + //create our application node + final Observable<Id> applicationNode = Observable.just( id ); + + //merge both the specified application node and the entity node + // so they all get used + return Observable.merge( applicationNode, entityNodes ) + .map( new Func1<Id, EntityData>() { + @Override + public EntityData call( final Id id ) { + return new EntityData( applicationScope, id ); + } + } ); + } + } ); + } + + + /** + * Get the entity data. Immutable bean for fast access + */ + public static final class EntityData { + public final ApplicationScope applicationScope; + public final Id entityId; + + + public EntityData( final ApplicationScope applicationScope, final Id entityId ) { + this.applicationScope = applicationScope; + this.entityId = entityId; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java index 8ead71c..93ba9f0 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java @@ -20,6 +20,8 @@ package org.apache.usergrid.corepersistence.rx; +import java.util.Arrays; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +38,7 @@ import org.apache.usergrid.persistence.model.entity.Id; import rx.Observable; import rx.Subscriber; import rx.functions.Action1; +import rx.functions.Func1; import static org.apache.usergrid.corepersistence.NamingUtils.generateApplicationId; import static org.apache.usergrid.corepersistence.NamingUtils.getApplicationScope; @@ -44,27 +47,26 @@ import static org.apache.usergrid.corepersistence.NamingUtils.getApplicationScop /** * An observable that will emit all application stored in the system. */ -public class ApplicationObservable implements Observable.OnSubscribe<Id> { +public class ApplicationObservable { - private static final Logger logger = LoggerFactory.getLogger( ApplicationObservable.class ); - private final GraphManagerFactory graphManagerFactory; + /** + * Get all applicationIds as an observable + * @param graphManagerFactory + * @return + */ + public static Observable<Id> getAllApplicationIds( final GraphManagerFactory graphManagerFactory ) { - public ApplicationObservable( final GraphManagerFactory graphManagerFactory ) { - this.graphManagerFactory = graphManagerFactory; - } + //emit our 3 hard coded applications that are used the manage the system first. + //this way consumers can perform whatever work they need to on the root system first - @Override - public void call( final Subscriber<? super Id> subscriber ) { + final Observable<Id> systemIds = Observable.from( Arrays.asList( generateApplicationId( NamingUtils.DEFAULT_APPLICATION_ID ), + generateApplicationId( NamingUtils.MANAGEMENT_APPLICATION_ID ), + generateApplicationId( NamingUtils.SYSTEM_APP_ID ) ) ); - //emit our 3 hard coded applications that are used the manage the system first. - //this way consumers can perform whatever work they need to on the root system first - emit( generateApplicationId( NamingUtils.DEFAULT_APPLICATION_ID ), subscriber ); - emit( generateApplicationId( NamingUtils.MANAGEMENT_APPLICATION_ID ), subscriber ); - emit( generateApplicationId( NamingUtils.SYSTEM_APP_ID ), subscriber ); ApplicationScope appScope = getApplicationScope( NamingUtils.SYSTEM_APP_ID ); @@ -75,45 +77,17 @@ public class ApplicationObservable implements Observable.OnSubscribe<Id> { Id rootAppId = appScope.getApplication(); - Observable<Edge> edges = gm.loadEdgesFromSource( + Observable<Id> appIds = gm.loadEdgesFromSource( new SimpleSearchByEdgeType( rootAppId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, - null ) ); - - - final int count = edges.doOnNext( new Action1<Edge>() { + null ) ).map( new Func1<Edge, Id>() { @Override - public void call( final Edge edge ) { - Id applicationId = edge.getTargetNode(); - - - logger.debug( "Emitting applicationId of {}", applicationId ); - - emit( applicationId, subscriber ); + public Id call( final Edge edge ) { + return edge.getTargetNode(); } - } ) - //if we don't want the count, not sure we need to block. We may just need to subscribe - .count().toBlocking().last(); + } ); - logger.debug( "Emitted {} application ids", count ); + return Observable.merge( systemIds, appIds); } - /** - * Return false if no more items should be emitted, true otherwise - */ - private boolean emit( final Id appId, final Subscriber<? super Id> subscriber ) { - - if ( subscriber.isUnsubscribed() ) { - return false; - } - - try { - subscriber.onNext( appId ); - } - catch ( Throwable t ) { - subscriber.onError( t ); - } - - return true; - } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java index b986d56..88efc7b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java @@ -32,42 +32,29 @@ import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType; import org.apache.usergrid.persistence.model.entity.Id; import rx.Observable; -import rx.Subscriber; import rx.functions.Func1; /** * Emits the id of all nodes that are target nodes for the given source node */ -public class EdgesFromSourceObservable implements Observable.OnSubscribe<Edge> { +public class EdgesFromSourceObservable { private static final Logger logger = LoggerFactory.getLogger( EdgesFromSourceObservable.class ); - private final ApplicationScope applicationScope; - private final Id sourceNode; - private final GraphManager gm; - - - public EdgesFromSourceObservable( final ApplicationScope applicationScope, final Id sourceNode, - final GraphManager gm ) { - this.applicationScope = applicationScope; - this.sourceNode = sourceNode; - this.gm = gm; - } - - - @Override - public void call( final Subscriber<? super Edge> subscriber ) { - + /** + * Get all edges from the source + */ + public static Observable<Edge> edgesFromSource( final ApplicationScope applicationScope, final Id sourceNode, + final GraphManager gm ) { final Id applicationId = applicationScope.getApplication(); //only search edge types that start with collections - Observable<String> edgeTypes = gm.getEdgeTypesFromSource( - new SimpleSearchEdgeType( sourceNode, null, null ) ); + Observable<String> edgeTypes = gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( sourceNode, null, null ) ); - edgeTypes.flatMap( new Func1<String, Observable<Edge>>() { + return edgeTypes.flatMap( new Func1<String, Observable<Edge>>() { @Override public Observable<Edge> call( final String edgeType ) { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java index bd5e12f..91ba741 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java @@ -29,47 +29,39 @@ import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.model.entity.Id; import rx.Observable; -import rx.Subscriber; -import rx.functions.Action1; +import rx.functions.Func1; /** * Emits the id of all nodes that are target nodes from the given source node */ -public class TargetIdObservable implements Observable.OnSubscribe<Id> { +public class TargetIdObservable { private static final Logger logger = LoggerFactory.getLogger( TargetIdObservable.class ); - private final ApplicationScope applicationScope; - private final Id sourceNode; - private final GraphManager gm; - - - public TargetIdObservable( final ApplicationScope applicationScope, final Id sourceNode, final GraphManager gm ) { - this.applicationScope = applicationScope; - this.sourceNode = sourceNode; - this.gm = gm; - } - - - @Override - public void call( final Subscriber<? super Id> subscriber ) { + /** + * Get all nodes that are target nodes from the sourceNode + * @param applicationScope + * @param sourceNode + * @param gm + * @return + */ + public static Observable<Id> getTargetNodes(final ApplicationScope applicationScope, final Id sourceNode, final GraphManager gm) { //only search edge types that start with collections - Observable.create( new EdgesFromSourceObservable( applicationScope, sourceNode, gm ) ) - .doOnNext( new Action1<Edge>() { + return EdgesFromSourceObservable.edgesFromSource( applicationScope, sourceNode, gm ).map( new Func1<Edge, Id>() { - @Override - public void call( Edge edge ) { - logger.info( "Emitting targetId of {}", edge ); + @Override + public Id call( final Edge edge ) { + final Id targetNode = edge.getTargetNode(); - final Id targetNode = edge.getTargetNode(); + logger.info( "Emitting targetId of {}", edge ); - subscriber.onNext( targetNode ); - } - } ).toBlocking().lastOrDefault( null ); // end foreach on edges + return targetNode; + } + } ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java index a8bd363..6e571d1 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java @@ -579,6 +579,9 @@ public interface EntityManager { String entityType, String propertyName, Object propertyValue ) throws Exception; @Deprecated + /** + * Get an entity by UUID. This will return null if the entity is not found + */ public Entity get( UUID id ) throws Exception; public <A extends Entity> A get( EntityRef entityRef, Class<A> entityClass ) throws Exception; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java index 203fa38..b3aeda7 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java @@ -46,7 +46,7 @@ import com.netflix.astyanax.MutationBatch; @Singleton public class EdgeMetadataSerializationProxyImpl implements EdgeMetadataSerialization { - public static final int MIGRATION_VERSION = 1; + public static final int MIGRATION_VERSION = 2; private final DataMigrationManager dataMigrationManager; private final Keyspace keyspace;
