Repository: incubator-usergrid Updated Branches: refs/heads/two-dot-o fbee23490 -> f44870477
Refactored naming utilities into their own class to aid in refactoring Created utility observables to make working with migration easier. Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/0b01afaa Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/0b01afaa Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/0b01afaa Branch: refs/heads/two-dot-o Commit: 0b01afaa446f0c3caef966010faffe52cd7198c7 Parents: d2d1fe7 Author: Todd Nine <[email protected]> Authored: Wed Nov 12 13:43:04 2014 -0700 Committer: Todd Nine <[email protected]> Committed: Wed Nov 12 13:43:04 2014 -0700 ---------------------------------------------------------------------- .../corepersistence/CpEntityManager.java | 2 +- .../corepersistence/CpEntityManagerFactory.java | 73 ++++-------- .../corepersistence/CpRelationManager.java | 2 +- .../usergrid/corepersistence/CpSetup.java | 2 +- .../HybridEntityManagerFactory.java | 2 +- .../usergrid/corepersistence/NamingUtils.java | 70 +++++++++++ .../migration/GraphShardVersionMigration.java | 84 ++++++++++++- .../rx/ApplicationObservable.java | 119 +++++++++++++++++++ .../rx/EdgesFromSourceObservable.java | 82 +++++++++++++ .../corepersistence/rx/TargetIdObservable.java | 75 ++++++++++++ .../PerformanceEntityRebuildIndexTest.java | 4 +- 11 files changed, 456 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0b01afaa/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index d67922c..7e8e1b2 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -205,7 +205,7 @@ public class CpEntityManager implements EntityManager { this.managerCache = this.emf.getManagerCache(); this.applicationId = applicationId; - applicationScope = this.emf.getApplicationScope( applicationId ); + applicationScope = NamingUtils.getApplicationScope( applicationId ); this.cass = this.emf.cass; this.counterUtils = this.emf.counterUtils; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0b01afaa/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index d968bc0..6393237 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -94,20 +94,6 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application private Setup setup = null; - public static final Class<DynamicEntity> APPLICATION_ENTITY_CLASS = DynamicEntity.class; - - /** The System Application where we store app and org metadata */ - public static final UUID SYSTEM_APP_ID = - UUID.fromString("b6768a08-b5d5-11e3-a495-10ddb1de66c3"); - - /** App where we store management info */ - public static final UUID MANAGEMENT_APPLICATION_ID = - UUID.fromString("b6768a08-b5d5-11e3-a495-11ddb1de66c8"); - - /** TODO Do we need this in two-dot-o? */ - public static final UUID DEFAULT_APPLICATION_ID = - UUID.fromString("b6768a08-b5d5-11e3-a495-11ddb1de66c9"); - /** Have we already initialized the index for the management app? */ private AtomicBoolean indexInitialized = new AtomicBoolean( ); @@ -130,34 +116,27 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application CassandraService cass; CounterUtils counterUtils; - private boolean skipAggregateCounters; - - private static final int REBUILD_PAGE_SIZE = 100; - public CpEntityManagerFactory( - CassandraService cass, CounterUtils counterUtils, boolean skipAggregateCounters) { + CassandraService cass, CounterUtils counterUtils) { this.cass = cass; this.counterUtils = counterUtils; - this.skipAggregateCounters = skipAggregateCounters; - if (skipAggregateCounters) { - logger.warn("NOTE: Counters have been disabled by configuration..."); - } + } private void init() { - EntityManager em = getEntityManager(SYSTEM_APP_ID); + EntityManager em = getEntityManager( NamingUtils.SYSTEM_APP_ID); try { if ( em.getApplication() == null ) { logger.info("Creating system application"); Map sysAppProps = new HashMap<String, Object>(); sysAppProps.put( PROPERTY_NAME, "systemapp"); - em.create( SYSTEM_APP_ID, TYPE_APPLICATION, sysAppProps ); + em.create( NamingUtils.SYSTEM_APP_ID, TYPE_APPLICATION, sysAppProps ); em.getApplication(); em.createIndex(); em.refreshIndex(); @@ -265,7 +244,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application Map<String, Object> properties ) throws Exception { - EntityManager em = getEntityManager(SYSTEM_APP_ID); + EntityManager em = getEntityManager( NamingUtils.SYSTEM_APP_ID); final String appName = buildAppName( organizationName, name ); @@ -315,14 +294,6 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application } - public ApplicationScope getApplicationScope( UUID applicationId ) { - - // We can always generate a scope, it doesn't matter if the application exists yet or not. - final ApplicationScopeImpl scope = - new ApplicationScopeImpl( generateApplicationId( applicationId ) ); - - return scope; - } @Override @@ -339,7 +310,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application // Query q = Query.fromQL(PROPERTY_NAME + " = '" + name + "'"); - EntityManager em = getEntityManager( SYSTEM_APP_ID ); + EntityManager em = getEntityManager( NamingUtils.SYSTEM_APP_ID ); final EntityRef alias = em.getAlias( "organizations", name ); @@ -369,7 +340,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application public UUID lookupApplication( String name ) throws Exception { init(); - EntityManager em = getEntityManager( SYSTEM_APP_ID ); + EntityManager em = getEntityManager( NamingUtils.SYSTEM_APP_ID ); final EntityRef alias = em.getAlias( "appinfos", name ); @@ -416,10 +387,10 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application Map<String, UUID> appMap = new HashMap<String, UUID>(); - ApplicationScope appScope = getApplicationScope(SYSTEM_APP_ID); + ApplicationScope appScope = NamingUtils.getApplicationScope( NamingUtils.SYSTEM_APP_ID ); GraphManager gm = managerCache.getGraphManager(appScope); - EntityManager em = getEntityManager(SYSTEM_APP_ID); + EntityManager em = getEntityManager( NamingUtils.SYSTEM_APP_ID); Application app = em.getApplication(); Id fromEntityId = new SimpleId( app.getUuid(), app.getType() ); @@ -472,7 +443,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application Map<String, String> props = new HashMap<String,String>(); - EntityManager em = getEntityManager(SYSTEM_APP_ID); + EntityManager em = getEntityManager( NamingUtils.SYSTEM_APP_ID); Query q = Query.fromQL("select *"); Results results = null; try { @@ -497,7 +468,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application @Override public boolean updateServiceProperties(Map<String, String> properties) { - EntityManager em = getEntityManager(SYSTEM_APP_ID); + EntityManager em = getEntityManager( NamingUtils.SYSTEM_APP_ID); Query q = Query.fromQL("select *"); Results results = null; try { @@ -546,7 +517,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application @Override public boolean deleteServiceProperty(String name) { - EntityManager em = getEntityManager(SYSTEM_APP_ID); + EntityManager em = getEntityManager( NamingUtils.SYSTEM_APP_ID); Query q = Query.fromQL("select *"); @@ -604,19 +575,17 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application @Override public UUID getManagementAppId() { - return MANAGEMENT_APPLICATION_ID; + return NamingUtils.MANAGEMENT_APPLICATION_ID; } @Override public UUID getDefaultAppId() { - return DEFAULT_APPLICATION_ID; + return NamingUtils.DEFAULT_APPLICATION_ID; } - private Id generateApplicationId(UUID id){ - return new SimpleId( id, Application.ENTITY_TYPE ); - } + /** * Gets the setup. @@ -661,7 +630,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application return Arrays.asList( getManagerCache().getEntityIndex( - new ApplicationScopeImpl( new SimpleId( SYSTEM_APP_ID, "application" ))), + new ApplicationScopeImpl( new SimpleId( NamingUtils.SYSTEM_APP_ID, "application" ))), // management app getManagerCache().getEntityIndex( @@ -691,9 +660,9 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application @Override public void rebuildInternalIndexes( ProgressObserver po ) throws Exception { - rebuildApplicationIndexes(SYSTEM_APP_ID, po); - rebuildApplicationIndexes( MANAGEMENT_APPLICATION_ID, po ); - rebuildApplicationIndexes( DEFAULT_APPLICATION_ID, po ); + rebuildApplicationIndexes( NamingUtils.SYSTEM_APP_ID, po); + rebuildApplicationIndexes( NamingUtils.MANAGEMENT_APPLICATION_ID, po ); + rebuildApplicationIndexes( NamingUtils.DEFAULT_APPLICATION_ID, po ); } @@ -750,8 +719,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application // could use any collection scope here, does not matter EntityCollectionManager ecm = getManagerCache().getEntityCollectionManager( new CollectionScopeImpl( - new SimpleId( SYSTEM_APP_ID, "application"), - new SimpleId( SYSTEM_APP_ID, "application"), + new SimpleId( NamingUtils.SYSTEM_APP_ID, "application"), + new SimpleId( NamingUtils.SYSTEM_APP_ID, "application"), "dummy" )); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0b01afaa/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index f7546b8..af9dfe4 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -213,7 +213,7 @@ public class CpRelationManager implements RelationManager { this.applicationId = applicationId; this.headEntity = headEntity; this.managerCache = emf.getManagerCache(); - this.applicationScope = emf.getApplicationScope( applicationId ); + this.applicationScope = NamingUtils.getApplicationScope( applicationId ); this.cass = em.getCass(); // TODO: eliminate need for this via Core Persistence this.indexBucketLocator = indexBucketLocator; // TODO: this also http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0b01afaa/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java index 48af6a2..9a34114 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java @@ -271,7 +271,7 @@ public class CpSetup implements Setup { static class SystemDefaults { private static final Application managementApp = - new Application( CpEntityManagerFactory.MANAGEMENT_APPLICATION_ID); + new Application( NamingUtils.MANAGEMENT_APPLICATION_ID); // private static final Application defaultApp = // new Application( CpEntityManagerFactory.DEFAULT_APPLICATION_ID ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0b01afaa/stack/core/src/main/java/org/apache/usergrid/corepersistence/HybridEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/HybridEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/HybridEntityManagerFactory.java index 4daf79a..de0dd2a 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/HybridEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/HybridEntityManagerFactory.java @@ -45,7 +45,7 @@ public class HybridEntityManagerFactory implements EntityManagerFactory, Applica boolean useCP = cass.getPropertiesMap().get("usergrid.persistence").equals("CP"); if ( useCP ) { logger.info("HybridEntityManagerFactory: configured for New Core Persistence engine"); - factory = new CpEntityManagerFactory(cass, counterUtils, skipAggCounters ); + factory = new CpEntityManagerFactory(cass, counterUtils ); } else { logger.info("HybridEntityManagerFactory: configured for Classic Usergrid persistence"); factory = new EntityManagerFactoryImpl( cass, counterUtils, skipAggCounters ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0b01afaa/stack/core/src/main/java/org/apache/usergrid/corepersistence/NamingUtils.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/NamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/NamingUtils.java new file mode 100644 index 0000000..9f4d4d8 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/NamingUtils.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence; + + +import java.util.UUID; + +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; +import org.apache.usergrid.persistence.entities.Application; +import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.persistence.model.entity.SimpleId; + + +/** + * Static class to encapsulate naming conventions used through the CP entity system + */ +public class NamingUtils { + + + /** The System Application where we store app and org metadata */ + public static final UUID SYSTEM_APP_ID = + UUID.fromString("b6768a08-b5d5-11e3-a495-10ddb1de66c3"); + /** App where we store management info */ + public static final UUID MANAGEMENT_APPLICATION_ID = + UUID.fromString("b6768a08-b5d5-11e3-a495-11ddb1de66c8"); + /** TODO Do we need this in two-dot-o? */ + public static final UUID DEFAULT_APPLICATION_ID = + UUID.fromString("b6768a08-b5d5-11e3-a495-11ddb1de66c9"); + + + /** + * Get the application scope from the given uuid + * @param applicationId The applicationId + */ + public static ApplicationScope getApplicationScope( UUID applicationId ) { + + // We can always generate a scope, it doesn't matter if the application exists yet or not. + final ApplicationScopeImpl scope = new ApplicationScopeImpl( generateApplicationId( applicationId ) ); + + return scope; + } + + + /** + * Generate an applicationId from the given UUID + * @param applicationId the applicationId + * + */ + public static Id generateApplicationId( UUID applicationId ) { + return new SimpleId( applicationId, Application.ENTITY_TYPE ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0b01afaa/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java index 92982ba..7c26f0c 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java @@ -22,17 +22,99 @@ package org.apache.usergrid.corepersistence.migration; +import java.util.List; + +import org.apache.usergrid.corepersistence.rx.ApplicationObservable; +import org.apache.usergrid.corepersistence.rx.EdgesFromSourceObservable; +import org.apache.usergrid.corepersistence.rx.TargetIdObservable; +import org.apache.usergrid.persistence.core.guice.CurrentImpl; +import org.apache.usergrid.persistence.core.guice.PreviousImpl; import org.apache.usergrid.persistence.core.migration.data.DataMigration; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; +import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType; +import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.inject.Inject; + +import rx.Observable; +import rx.functions.Action1; +import rx.functions.Func1; /** * Migration for migrating graph edges to the new Shards */ -public class GraphShardVersionMigration implements DataMigration{ +public class GraphShardVersionMigration implements DataMigration { + + + private final EdgeMetadataSerialization v1Serialization; + + + private final EdgeMetadataSerialization v2Serialization; + + private final GraphManagerFactory graphManagerFactory; + + + @Inject + public GraphShardVersionMigration( @PreviousImpl final EdgeMetadataSerialization v1Serialization, + @CurrentImpl final EdgeMetadataSerialization v2Serialization, + final GraphManagerFactory graphManagerFactory ) { + this.v1Serialization = v1Serialization; + this.v2Serialization = v2Serialization; + this.graphManagerFactory = graphManagerFactory; + } + @Override public void migrate( final ProgressObserver observer ) throws Throwable { +// TODO, finish this +// get each applicationid in our system +// Observable.create( new ApplicationObservable( graphManagerFactory ) ) .doOnNext( new Action1<Id>() { +// @Override +// public void call( final Id id ) { +// +// //set up our application scope and graph manager +// final ApplicationScope applicationScope = new ApplicationScopeImpl( id ); +// +// +// final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope ); +// +// +// //load all nodes that are targets of our application node. I.E. entities that have been saved +// final Observable<Edge> entityNodes = Observable.create( new EdgesFromSourceObservable( applicationScope, id, gm ) ); +// +// //create our application node +// final Observable<Id> applicationNode = Observable.just( id ); +// +// //merge both the specified application node and the entity node so they all get used +// Observable.merge( applicationNode, entityNodes ).doOnNext( new Action1<Id>() { +// //load all meta types from and to-and re-save them +// +// +// @Override +// public void call( final Id id ) { +// //get the edge types from the source, buffer them, then re-save them. This implicity +// // updates target edges as well +// gm.loadEdgesFromSource( new SimpleSearchByEdgeType() +// new SimpleSearchEdgeType( id, null, null )).buffer( 1000 ).doOnNext( +// +// new Action1<List<String>>() { +// @Override +// public void call( final List<String> strings ) { +// v2Serialization.writeEdge( applicationScope, ) +// } +// } ) +// } +// } ); +// } +// } ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0b01afaa/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java new file mode 100644 index 0000000..8ead71c --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.rx; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.corepersistence.NamingUtils; +import org.apache.usergrid.corepersistence.util.CpNamingUtils; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.graph.SearchByEdgeType; +import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; +import org.apache.usergrid.persistence.model.entity.Id; + +import rx.Observable; +import rx.Subscriber; +import rx.functions.Action1; + +import static org.apache.usergrid.corepersistence.NamingUtils.generateApplicationId; +import static org.apache.usergrid.corepersistence.NamingUtils.getApplicationScope; + + +/** + * An observable that will emit all application stored in the system. + */ +public class ApplicationObservable implements Observable.OnSubscribe<Id> { + + private static final Logger logger = LoggerFactory.getLogger( ApplicationObservable.class ); + + private final GraphManagerFactory graphManagerFactory; + + + public ApplicationObservable( final GraphManagerFactory graphManagerFactory ) { + this.graphManagerFactory = graphManagerFactory; + } + + + @Override + public void call( final Subscriber<? super Id> subscriber ) { + + + //emit our 3 hard coded applications that are used the manage the system first. + //this way consumers can perform whatever work they need to on the root system first + emit( generateApplicationId( NamingUtils.DEFAULT_APPLICATION_ID ), subscriber ); + emit( generateApplicationId( NamingUtils.MANAGEMENT_APPLICATION_ID ), subscriber ); + emit( generateApplicationId( NamingUtils.SYSTEM_APP_ID ), subscriber ); + + + ApplicationScope appScope = getApplicationScope( NamingUtils.SYSTEM_APP_ID ); + GraphManager gm = graphManagerFactory.createEdgeManager( appScope ); + + String edgeType = CpNamingUtils.getEdgeTypeFromCollectionName( "appinfos" ); + + Id rootAppId = appScope.getApplication(); + + + Observable<Edge> edges = gm.loadEdgesFromSource( + new SimpleSearchByEdgeType( rootAppId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, + null ) ); + + + final int count = edges.doOnNext( new Action1<Edge>() { + @Override + public void call( final Edge edge ) { + Id applicationId = edge.getTargetNode(); + + + logger.debug( "Emitting applicationId of {}", applicationId ); + + emit( applicationId, subscriber ); + } + } ) + //if we don't want the count, not sure we need to block. We may just need to subscribe + .count().toBlocking().last(); + + logger.debug( "Emitted {} application ids", count ); + } + + + /** + * Return false if no more items should be emitted, true otherwise + */ + private boolean emit( final Id appId, final Subscriber<? super Id> subscriber ) { + + if ( subscriber.isUnsubscribed() ) { + return false; + } + + try { + subscriber.onNext( appId ); + } + catch ( Throwable t ) { + subscriber.onError( t ); + } + + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0b01afaa/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java new file mode 100644 index 0000000..b986d56 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.rx; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.graph.SearchByEdgeType; +import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; +import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType; +import org.apache.usergrid.persistence.model.entity.Id; + +import rx.Observable; +import rx.Subscriber; +import rx.functions.Func1; + + +/** + * Emits the id of all nodes that are target nodes for the given source node + */ +public class EdgesFromSourceObservable implements Observable.OnSubscribe<Edge> { + + private static final Logger logger = LoggerFactory.getLogger( EdgesFromSourceObservable.class ); + + private final ApplicationScope applicationScope; + private final Id sourceNode; + private final GraphManager gm; + + + public EdgesFromSourceObservable( final ApplicationScope applicationScope, final Id sourceNode, + final GraphManager gm ) { + this.applicationScope = applicationScope; + this.sourceNode = sourceNode; + this.gm = gm; + } + + + @Override + public void call( final Subscriber<? super Edge> subscriber ) { + + + final Id applicationId = applicationScope.getApplication(); + //only search edge types that start with collections + + + Observable<String> edgeTypes = gm.getEdgeTypesFromSource( + new SimpleSearchEdgeType( sourceNode, null, null ) ); + + edgeTypes.flatMap( new Func1<String, Observable<Edge>>() { + @Override + public Observable<Edge> call( final String edgeType ) { + + logger.debug( "Loading edges of edgeType {} from {}\n scope {}", + new Object[] { edgeType, sourceNode, applicationScope } ); + + return gm.loadEdgesFromSource( new SimpleSearchByEdgeType( applicationId, edgeType, Long.MAX_VALUE, + SearchByEdgeType.Order.DESCENDING, null ) ); + } + } ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0b01afaa/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java new file mode 100644 index 0000000..bd5e12f --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.rx; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.model.entity.Id; + +import rx.Observable; +import rx.Subscriber; +import rx.functions.Action1; + + +/** + * Emits the id of all nodes that are target nodes from the given source node + */ +public class TargetIdObservable implements Observable.OnSubscribe<Id> { + + private static final Logger logger = LoggerFactory.getLogger( TargetIdObservable.class ); + + private final ApplicationScope applicationScope; + private final Id sourceNode; + private final GraphManager gm; + + + public TargetIdObservable( final ApplicationScope applicationScope, final Id sourceNode, final GraphManager gm ) { + this.applicationScope = applicationScope; + this.sourceNode = sourceNode; + this.gm = gm; + } + + + @Override + public void call( final Subscriber<? super Id> subscriber ) { + + + //only search edge types that start with collections + Observable.create( new EdgesFromSourceObservable( applicationScope, sourceNode, gm ) ) + .doOnNext( new Action1<Edge>() { + + @Override + public void call( Edge edge ) { + + logger.info( "Emitting targetId of {}", edge ); + + final Id targetNode = edge.getTargetNode(); + + + subscriber.onNext( targetNode ); + } + } ).toBlocking().lastOrDefault( null ); // end foreach on edges + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0b01afaa/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java index 837dd9d..174976d 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java @@ -40,8 +40,8 @@ import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.usergrid.cassandra.Concurrent; -import org.apache.usergrid.corepersistence.CpEntityManagerFactory; import org.apache.usergrid.corepersistence.CpSetup; +import org.apache.usergrid.corepersistence.NamingUtils; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; import org.apache.usergrid.persistence.index.EntityIndex; @@ -165,7 +165,7 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT { // ----------------- delete the system and application indexes logger.debug("Deleting app index and system app index"); - deleteIndex( CpEntityManagerFactory.SYSTEM_APP_ID ); + deleteIndex( NamingUtils.SYSTEM_APP_ID ); deleteIndex( em.getApplicationId() ); // ----------------- test that we can read them, should fail
