Refactored generation of collection scope to be re-used with CpNamingUtils for consistency
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9ed79642 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9ed79642 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9ed79642 Branch: refs/heads/index-alias Commit: 9ed796423f15a8ef33137b113f6e04c578994b21 Parents: 68f4e0f Author: Todd Nine <[email protected]> Authored: Thu Nov 20 14:15:24 2014 -0700 Committer: Todd Nine <[email protected]> Committed: Thu Nov 20 14:23:38 2014 -0700 ---------------------------------------------------------------------- .../migration/EntityDataMigration.java | 139 +++++++++++++++++ .../migration/EntityTypeMappingMigration.java | 28 ++-- .../migration/GraphShardVersionMigration.java | 104 +++++++------ .../rx/AllEntitiesInSystemObservable.java | 27 ++-- .../corepersistence/StaleIndexCleanupTest.java | 5 +- .../migration/EntityTypeMappingMigrationIT.java | 74 +++++----- .../migration/GraphShardVersionMigrationIT.java | 148 +++++++++++-------- .../rx/AllEntitiesInSystemObservableIT.java | 23 ++- 8 files changed, 364 insertions(+), 184 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java new file mode 100644 index 0000000..79d31a8 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.migration; + + +import java.util.Iterator; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.corepersistence.ManagerCache; +import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable; +import org.apache.usergrid.corepersistence.util.CpNamingUtils; +import org.apache.usergrid.persistence.collection.CollectionScope; +import org.apache.usergrid.persistence.collection.MvccEntity; +import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy; +import org.apache.usergrid.persistence.core.guice.CurrentImpl; +import org.apache.usergrid.persistence.core.guice.PreviousImpl; +import org.apache.usergrid.persistence.core.migration.data.DataMigration; +import org.apache.usergrid.persistence.core.migration.data.DataMigrationException; +import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.persistence.model.util.UUIDGenerator; + +import com.google.inject.Inject; +import com.netflix.astyanax.Keyspace; +import com.netflix.astyanax.MutationBatch; +import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; + +import rx.functions.Action1; + + +/** + * Migration for migrating graph edges to the new Shards + */ +public class EntityDataMigration implements DataMigration { + + + private static final Logger logger = LoggerFactory.getLogger( EntityDataMigration.class ); + + + private final MvccEntitySerializationStrategy v1Serialization; + private final MvccEntitySerializationStrategy v2Serialization; + + private final ManagerCache managerCache; + private final Keyspace keyspace; + + + @Inject + public EntityDataMigration( @PreviousImpl final MvccEntitySerializationStrategy v1Serialization, + @CurrentImpl final MvccEntitySerializationStrategy v2Serialization, + final ManagerCache managerCache, final Keyspace keyspace ) { + this.v1Serialization = v1Serialization; + this.v2Serialization = v2Serialization; + this.managerCache = managerCache; + this.keyspace = keyspace; + } + + + @Override + public void migrate( final ProgressObserver observer ) throws Throwable { + + + AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 ).doOnNext( + new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() { + + + @Override + public void call( + final AllEntitiesInSystemObservable.ApplicationEntityGroup applicationEntityGroup ) { + + + final UUID now = UUIDGenerator.newTimeUUID(); + + final Id appScopeId = applicationEntityGroup.applicationScope.getApplication(); + + + final MutationBatch totalBatch = keyspace.prepareMutationBatch(); + + for ( Id entityId : applicationEntityGroup.entityIds ) { + + CollectionScope currentScope = CpNamingUtils.getCollectionScopeNameFromEntityType( + appScopeId, entityId.getType() ); + + + Iterator<MvccEntity> allVersions = + v1Serialization.loadDescendingHistory( currentScope, entityId, now, 1000 ); + + while ( allVersions.hasNext() ) { + final MvccEntity version = allVersions.next(); + + final MutationBatch versionBatch = v2Serialization.write( currentScope, version ); + + totalBatch.mergeShallow( versionBatch ); + + if ( totalBatch.getRowCount() >= 50 ) { + try { + totalBatch.execute(); + } + catch ( ConnectionException e ) { + throw new DataMigrationException( "Unable to migrate batches ", e ); + } + } + } + } + + try { + totalBatch.execute(); + } + catch ( ConnectionException e ) { + throw new DataMigrationException( "Unable to migrate batches ", e ); + } + } + } ).toBlocking().last(); + } + + + @Override + public int getVersion() { + return Versions.VERSION_3; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java index 1adfe73..8089dfd 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java @@ -29,13 +29,10 @@ import org.slf4j.LoggerFactory; import org.apache.usergrid.corepersistence.ManagerCache; import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable; import org.apache.usergrid.corepersistence.util.CpNamingUtils; -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.core.migration.data.DataMigration; -import org.apache.usergrid.persistence.graph.GraphManagerFactory; import org.apache.usergrid.persistence.map.MapManager; -import org.apache.usergrid.persistence.map.MapManagerFactory; import org.apache.usergrid.persistence.map.MapScope; -import org.apache.usergrid.persistence.map.impl.MapScopeImpl; +import org.apache.usergrid.persistence.model.entity.Id; import com.google.inject.Inject; @@ -47,9 +44,6 @@ import rx.functions.Action1; */ public class EntityTypeMappingMigration implements DataMigration { - - private static final Logger logger = LoggerFactory.getLogger( EntityTypeMappingMigration.class ); - private final ManagerCache managerCache; @@ -65,25 +59,27 @@ public class EntityTypeMappingMigration implements DataMigration { final AtomicLong atomicLong = new AtomicLong(); - AllEntitiesInSystemObservable.getAllEntitiesInSystem(managerCache ) - .doOnNext( new Action1<AllEntitiesInSystemObservable.EntityData>() { + AllEntitiesInSystemObservable.getAllEntitiesInSystem(managerCache, 1000 ) + .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() { @Override - public void call( final AllEntitiesInSystemObservable.EntityData entityData ) { + public void call( final AllEntitiesInSystemObservable.ApplicationEntityGroup applicationEntityGroup ) { - final MapScope ms = CpNamingUtils.getEntityTypeMapScope( entityData.applicationScope.getApplication() ); + final MapScope ms = CpNamingUtils.getEntityTypeMapScope( applicationEntityGroup.applicationScope.getApplication() ); final MapManager mapManager = managerCache.getMapManager( ms ); - final UUID entityUuid = entityData.entityId.getUuid(); - final String entityType = entityData.entityId.getType(); + for(Id entityId: applicationEntityGroup.entityIds) { + final UUID entityUuid = entityId.getUuid(); + final String entityType = entityId.getType(); - mapManager.putString( entityUuid.toString(), entityType ); + mapManager.putString( entityUuid.toString(), entityType ); - if ( atomicLong.incrementAndGet() % 100 == 0 ) { - updateStatus( atomicLong, observer ); + if ( atomicLong.incrementAndGet() % 100 == 0 ) { + updateStatus( atomicLong, observer ); + } } } } ).toBlocking().lastOrDefault( null ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java index ac4cd58..3b92570 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java @@ -36,6 +36,7 @@ import org.apache.usergrid.persistence.core.migration.data.DataMigration; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization; +import org.apache.usergrid.persistence.model.entity.Id; import com.google.inject.Inject; import com.netflix.astyanax.Keyspace; @@ -64,8 +65,7 @@ public class GraphShardVersionMigration implements DataMigration { @Inject public GraphShardVersionMigration( @CurrentImpl final EdgeMetadataSerialization v2Serialization, - final ManagerCache managerCache, final - Keyspace keyspace ) { + final ManagerCache managerCache, final Keyspace keyspace ) { this.v2Serialization = v2Serialization; this.managerCache = managerCache; this.keyspace = keyspace; @@ -77,51 +77,71 @@ public class GraphShardVersionMigration implements DataMigration { final AtomicLong counter = new AtomicLong(); - AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache).flatMap( - new Func1<AllEntitiesInSystemObservable.EntityData, Observable<List<Edge>>>() { + AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 ).flatMap( + new Func1<AllEntitiesInSystemObservable.ApplicationEntityGroup, Observable<List<Edge>>>() { @Override - public Observable<List<Edge>> call( final AllEntitiesInSystemObservable.EntityData entityData ) { - logger.info( "Migrating edges from node {} in scope {}", entityData.entityId, - entityData.applicationScope ); - - final GraphManager gm = managerCache.getGraphManager( entityData.applicationScope ); - - //get each edge from this node as a source - return EdgesFromSourceObservable.edgesFromSource( gm, entityData.entityId ) - - //for each edge, re-index it in v2 every 1000 edges or less - .buffer( 1000 ).doOnNext( new Action1<List<Edge>>() { - @Override - public void call( final List<Edge> edges ) { - - final MutationBatch batch = keyspace.prepareMutationBatch(); - - for ( final Edge edge : edges ) { - logger.info( "Migrating meta for edge {}", edge ); - final MutationBatch edgeBatch = - v2Serialization.writeEdge( entityData.applicationScope, edge ); - batch.mergeShallow( edgeBatch ); - } - - try { - batch.execute(); - } - catch ( ConnectionException e ) { - throw new RuntimeException( "Unable to perform migration", e ); - } - - //update the observer so the admin can see it - final long newCount = counter.addAndGet( edges.size() ); - - observer.update( getVersion(), String.format("Currently running. Rewritten %d edge types", newCount) ); - - - } - } ); + public Observable<List<Edge>> call( + final AllEntitiesInSystemObservable.ApplicationEntityGroup applicationEntityGroup ) { + + //emit a stream of all ids from this group + return Observable.from( applicationEntityGroup.entityIds ) + .flatMap( new Func1<Id, Observable<List<Edge>>>() { + + + //for each id in the group, get it's edges + @Override + public Observable<List<Edge>> call( final Id id ) { + logger.info( "Migrating edges from node {} in scope {}", id, + applicationEntityGroup.applicationScope ); + + final GraphManager gm = managerCache + .getGraphManager( applicationEntityGroup.applicationScope ); + + //get each edge from this node as a source + return EdgesFromSourceObservable.edgesFromSource( gm, id ) + + //for each edge, re-index it in v2 every 1000 edges or less + .buffer( 1000 ).doOnNext( new Action1<List<Edge>>() { + @Override + public void call( final List<Edge> edges ) { + + final MutationBatch batch = + keyspace.prepareMutationBatch(); + + for ( final Edge edge : edges ) { + logger.info( "Migrating meta for edge {}", edge ); + final MutationBatch edgeBatch = v2Serialization + .writeEdge( + applicationEntityGroup + .applicationScope, + edge ); + batch.mergeShallow( edgeBatch ); + } + + try { + batch.execute(); + } + catch ( ConnectionException e ) { + throw new RuntimeException( + "Unable to perform migration", e ); + } + + //update the observer so the admin can see it + final long newCount = + counter.addAndGet( edges.size() ); + + observer.update( getVersion(), String.format( + "Currently running. Rewritten %d edge types", + newCount ) ); + } + } ); + } + } ); } } ).toBlocking().lastOrDefault( null ); + ; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java index 291bbe9..771b81f 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java @@ -20,6 +20,8 @@ package org.apache.usergrid.corepersistence.rx; +import java.util.List; + import org.apache.usergrid.corepersistence.ManagerCache; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; @@ -40,14 +42,17 @@ public class AllEntitiesInSystemObservable { /** * Return an observable that emits all entities in the system. + * @param managerCache the managerCache to use + * @param bufferSize The amount of entityIds to buffer into each ApplicationEntityGroup. Note that if we exceed the buffer size + * you may be more than 1 ApplicationEntityGroup with the same application and different ids */ - public static Observable<EntityData> getAllEntitiesInSystem( final ManagerCache managerCache) { + public static Observable<ApplicationEntityGroup> getAllEntitiesInSystem( final ManagerCache managerCache, final int bufferSize) { //traverse all nodes in the graph, load all source edges from them, then re-save the meta data return ApplicationObservable.getAllApplicationIds( managerCache ) - .flatMap( new Func1<Id, Observable<EntityData>>() { + .flatMap( new Func1<Id, Observable<ApplicationEntityGroup>>() { @Override - public Observable<EntityData> call( final Id applicationId ) { + public Observable<ApplicationEntityGroup> call( final Id applicationId ) { //set up our application scope and graph manager final ApplicationScope applicationScope = new ApplicationScopeImpl( @@ -68,11 +73,11 @@ public class AllEntitiesInSystemObservable { //merge both the specified application node and the entity node // so they all get used - return Observable.merge( applicationNode, entityNodes ) - .map( new Func1<Id, EntityData>() { + return Observable.merge( applicationNode, entityNodes ).buffer(bufferSize) + .map( new Func1<List<Id>, ApplicationEntityGroup>() { @Override - public EntityData call( final Id id ) { - return new EntityData( applicationScope, id ); + public ApplicationEntityGroup call( final List<Id> id ) { + return new ApplicationEntityGroup( applicationScope, id ); } } ); } @@ -83,14 +88,14 @@ public class AllEntitiesInSystemObservable { /** * Get the entity data. Immutable bean for fast access */ - public static final class EntityData { + public static final class ApplicationEntityGroup { public final ApplicationScope applicationScope; - public final Id entityId; + public final List<Id> entityIds; - public EntityData( final ApplicationScope applicationScope, final Id entityId ) { + public ApplicationEntityGroup( final ApplicationScope applicationScope, final List<Id> entityIds ) { this.applicationScope = applicationScope; - this.entityId = entityId; + this.entityIds = entityIds; } } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java index fa9f9df..9d0c9e6 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java @@ -52,6 +52,7 @@ import org.apache.usergrid.persistence.model.entity.SimpleId; import com.fasterxml.uuid.UUIDComparator; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromEntityType; import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -220,9 +221,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT { EntityManager em = app.getEntityManager(); - CollectionScope cs = new CollectionScopeImpl( new SimpleId( em.getApplicationId(), TYPE_APPLICATION ), - new SimpleId( em.getApplicationId(), TYPE_APPLICATION ), - CpNamingUtils.getCollectionScopeNameFromEntityType( eref.getType() ) ); + CollectionScope cs = getCollectionScopeNameFromEntityType( new SimpleId( em.getApplicationId(), TYPE_APPLICATION ), eref.getType() ); EntityCollectionManagerFactory ecmf = CpSetup.getInjector().getInstance( EntityCollectionManagerFactory.class ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java index dafdb00..1f0665a 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java @@ -35,7 +35,6 @@ import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.persistence.Entity; import org.apache.usergrid.persistence.EntityManager; import org.apache.usergrid.persistence.EntityManagerFactory; -import org.apache.usergrid.persistence.core.migration.schema.MigrationManager; import org.apache.usergrid.persistence.map.impl.MapSerializationImpl; import org.apache.usergrid.persistence.model.entity.Id; @@ -46,7 +45,6 @@ import rx.functions.Action1; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -75,7 +73,7 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT { @Test public void testIdMapping() throws Throwable { - assertEquals("version 1 expected", 1, entityTypeMappingMigration.getVersion()); + assertEquals( "version 1 expected", 1, entityTypeMappingMigration.getVersion() ); final EntityManager newAppEm = app.getEntityManager(); @@ -87,7 +85,6 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT { final Set<Id> type2Identities = EntityWriteHelper.createTypes( newAppEm, type2, size ); - final Set<Id> allEntities = new HashSet<>(); allEntities.addAll( type1Identities ); allEntities.addAll( type2Identities ); @@ -106,38 +103,45 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT { entityTypeMappingMigration.migrate( progressObserver ); - - - - AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache ) - .doOnNext( new Action1<AllEntitiesInSystemObservable.EntityData>() { + AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 ) + .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() { @Override - public void call( final AllEntitiesInSystemObservable.EntityData entity ) { + public void call( + final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) { //ensure that each one has a type - try { - - final EntityManager em = emf.getEntityManager( entity.applicationScope.getApplication().getUuid() ); - final Entity returned = em.get( entity.entityId.getUuid() ); - - //we seem to occasionally get phantom edges. If this is the case we'll store the type _> uuid mapping, but we won't have anything to load - if(returned != null) { - assertEquals( entity.entityId.getUuid(), returned.getUuid() ); - assertEquals( entity.entityId.getType(), returned.getType() ); - } - else { - final String type = managerCache.getMapManager( CpNamingUtils.getEntityTypeMapScope( - entity.applicationScope.getApplication() ) ) - .getString( entity.entityId.getUuid().toString() ); - - assertEquals(entity.entityId.getType(), type); - } - } - catch ( Exception e ) { - throw new RuntimeException( "Unable to get entity " + entity.entityId - + " by UUID, migration failed", e ); - } - allEntities.remove( entity.entityId ); + final EntityManager em = emf.getEntityManager( + entity.applicationScope.getApplication().getUuid() ); + + for ( final Id id : entity.entityIds ) { + try { + final Entity returned = em.get( id.getUuid() ); + + //we seem to occasionally get phantom edges. If this is the + // case we'll store the type _> uuid mapping, but we won't have + // anything to load + + if ( returned != null ) { + assertEquals( id.getUuid(), returned.getUuid() ); + assertEquals( id.getType(), returned.getType() ); + } + else { + final String type = managerCache.getMapManager( CpNamingUtils + .getEntityTypeMapScope( + entity.applicationScope.getApplication() ) ) + .getString( id.getUuid() + .toString() ); + + assertEquals( id.getType(), type ); + } + } + catch ( Exception e ) { + throw new RuntimeException( "Unable to get entity " + id + + " by UUID, migration failed", e ); + } + + allEntities.remove( id ); + } } } ).toBlocking().lastOrDefault( null ); @@ -145,9 +149,5 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT { assertEquals( "Every element should have been encountered", 0, allEntities.size() ); assertFalse( "Progress observer should not have failed", progressObserver.getFailed() ); assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 ); - - } - - } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java index aab47a0..88c02cd 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java @@ -32,12 +32,9 @@ import org.apache.usergrid.corepersistence.EntityWriteHelper; import org.apache.usergrid.corepersistence.ManagerCache; import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable; import org.apache.usergrid.persistence.EntityManager; -import org.apache.usergrid.persistence.EntityManagerFactory; import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager; import org.apache.usergrid.persistence.core.migration.data.DataMigrationManagerImpl; import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization; -import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerializationImpl; -import org.apache.usergrid.persistence.core.migration.schema.MigrationManager; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType; import org.apache.usergrid.persistence.model.entity.Id; @@ -114,40 +111,41 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT { //read everything in previous version format and put it into our types. - AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache ) - .doOnNext( new Action1<AllEntitiesInSystemObservable.EntityData>() { + AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000) + .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() { @Override - public void call( final AllEntitiesInSystemObservable.EntityData entity ) { + public void call( final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) { final GraphManager gm = managerCache.getGraphManager( entity.applicationScope ); - /** - * Get our edge types from the source - */ - gm.getEdgeTypesFromSource( - new SimpleSearchEdgeType( entity.entityId, null, null ) ) - .doOnNext( new Action1<String>() { - @Override - public void call( final String s ) { - sourceTypes.put( entity.entityId, s ); - } - } ).toBlocking().lastOrDefault( null ); - - - /** - * Get the edge types to the target - */ - gm.getEdgeTypesToTarget( - new SimpleSearchEdgeType( entity.entityId, null, null ) ) - .doOnNext( new Action1<String>() { - @Override - public void call( final String s ) { - targetTypes.put( entity.entityId, s ); - } - } ).toBlocking().lastOrDefault( null ); - - allEntities.remove( entity.entityId ); + for(final Id id: entity.entityIds) { + /** + * Get our edge types from the source + */ + gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( id, null, null ) ) + .doOnNext( new Action1<String>() { + @Override + public void call( final String s ) { + sourceTypes.put( id, s ); + } + } ).toBlocking().lastOrDefault( null ); + + + /** + * Get the edge types to the target + */ + gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( id, + null, null ) ) + .doOnNext( new Action1<String>() { + @Override + public void call( final String s ) { + targetTypes.put( id, s ); + } + } ).toBlocking().lastOrDefault( null ); + + allEntities.remove( id ); + } } } ).toBlocking().lastOrDefault( null ); @@ -169,42 +167,66 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT { //now visit all nodes in the system and remove their types from the multi maps, it should be empty at the end - AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache ) - .doOnNext( new Action1<AllEntitiesInSystemObservable.EntityData>() { + AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 ) + .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() { @Override - public void call( final AllEntitiesInSystemObservable.EntityData entity ) { + public void call( + final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) { final GraphManager gm = managerCache.getGraphManager( entity.applicationScope ); - /** - * Get our edge types from the source - */ - gm.getEdgeTypesFromSource( - new SimpleSearchEdgeType( entity.entityId, null, null ) ) - .doOnNext( new Action1<String>() { - @Override - public void call( final String s ) { - sourceTypes.remove( entity.entityId, s ); - } - } ).toBlocking().lastOrDefault( null ); - - - /** - * Get the edge types to the target - */ - gm.getEdgeTypesToTarget( - new SimpleSearchEdgeType( entity.entityId, null, null ) ) - .doOnNext( new Action1<String>() { - @Override - public void call( final String s ) { - targetTypes.remove( entity.entityId, s ); - } - } ).toBlocking().lastOrDefault( null ); + for ( final Id id : entity.entityIds ) { + /** + * Get our edge types from the source + */ + gm.getEdgeTypesFromSource( + new SimpleSearchEdgeType( id, null, null ) ) + .doOnNext( new Action1<String>() { + @Override + public void call( final String s ) { + sourceTypes.remove( id, s ); + } + } ).toBlocking().lastOrDefault( null ); + + + /** + * Get the edge types to the target + */ + gm.getEdgeTypesToTarget( + new SimpleSearchEdgeType( id, null, null ) ) + .doOnNext( new Action1<String>() { + @Override + public void call( final String s ) { + targetTypes.remove( id, s ); + } + } ).toBlocking().lastOrDefault( null ); + } + } } - } ).toBlocking().lastOrDefault( null ); - assertEquals( "All source types migrated", 0, sourceTypes.size() ); - assertEquals( "All target types migrated", 0, targetTypes.size() ); + + ). + + + toBlocking() + + + . + + + lastOrDefault( null ); + + + assertEquals( "All source types migrated",0,sourceTypes.size( ) + + + ); + + + assertEquals( "All target types migrated",0,targetTypes.size( ) + + + ); + } } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java index 423dc1f..4d1c6c9 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java @@ -20,7 +20,6 @@ package org.apache.usergrid.corepersistence.rx; -import java.util.HashMap; import java.util.HashSet; import java.util.Set; @@ -33,13 +32,11 @@ import org.apache.usergrid.corepersistence.CpSetup; import org.apache.usergrid.corepersistence.EntityWriteHelper; import org.apache.usergrid.corepersistence.ManagerCache; import org.apache.usergrid.corepersistence.util.CpNamingUtils; -import org.apache.usergrid.persistence.Entity; import org.apache.usergrid.persistence.EntityManager; import org.apache.usergrid.persistence.SimpleEntityRef; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.model.entity.Id; -import org.apache.usergrid.persistence.model.entity.SimpleId; import rx.functions.Action1; @@ -95,26 +92,28 @@ public class AllEntitiesInSystemObservableIT extends AbstractCoreIT { final GraphManager gm = managerCache.getGraphManager( scope ); - AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache ).doOnNext( new Action1<AllEntitiesInSystemObservable.EntityData>() { + AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 ).doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() { @Override - public void call( final AllEntitiesInSystemObservable.EntityData entity ) { + public void call( final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) { assertNotNull(entity); assertNotNull(entity.applicationScope); - assertNotNull(entity.entityId); + assertNotNull(entity.entityIds); //not from our test, don't check it if(!applicationId.equals( entity.applicationScope.getApplication() )){ return; } + for(Id id: entity.entityIds) { - //we should only emit each node once - if ( entity.entityId.getType().equals( type1 ) ) { - assertTrue( "Element should be present on removal", type1Identities.remove( entity.entityId ) ); - } - else if ( entity.entityId.getType().equals( type2 ) ) { - assertTrue( "Element should be present on removal", type2Identities.remove( entity.entityId ) ); + //we should only emit each node once + if ( id.getType().equals( type1 ) ) { + assertTrue( "Element should be present on removal", type1Identities.remove( id ) ); + } + else if ( id.getType().equals( type2 ) ) { + assertTrue( "Element should be present on removal", type2Identities.remove( id ) ); + } } } } ).toBlocking().lastOrDefault( null );
