First pass at upgrading to java 8 and latest RX java
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/282e2271 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/282e2271 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/282e2271 Branch: refs/heads/USERGRID-486 Commit: 282e22712890cdda0439a5694810cff632526d7b Parents: 72ec19d Author: Todd Nine <[email protected]> Authored: Thu Mar 19 18:00:57 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Thu Mar 19 18:00:57 2015 -0600 ---------------------------------------------------------------------- stack/core/pom.xml | 26 +-- .../corepersistence/CpEntityManager.java | 2 +- .../corepersistence/CpEntityManagerFactory.java | 7 +- .../corepersistence/CpRelationManager.java | 24 +-- .../usergrid/corepersistence/CpWalker.java | 77 +++----- .../events/EntityVersionDeletedHandler.java | 72 +++---- .../migration/EntityTypeMappingMigration.java | 41 ++-- .../migration/EntityTypeMappingMigrationIT.java | 2 +- .../impl/EntityCollectionManagerImpl.java | 10 +- .../collection/impl/EntityDeletedTask.java | 20 +- .../impl/EntityVersionCleanupTask.java | 27 +-- .../impl/EntityVersionCreatedTask.java | 26 +-- .../MvccEntitySerializationStrategyImpl.java | 89 +++------ .../MvccEntitySerializationStrategyV3Impl.java | 91 +++------ .../migration/MvccEntityDataMigrationImpl.java | 169 +++++++--------- .../persistence/collection/rx/ParallelTest.java | 10 +- ...ctMvccEntityDataMigrationV1ToV3ImplTest.java | 2 +- stack/corepersistence/common/pom.xml | 15 +- .../astyanax/MultiKeyColumnNameIterator.java | 4 +- .../MultiKeyColumnNameIteratorTest.java | 187 ++++++++---------- .../astyanax/MultiRowColumnIteratorTest.java | 50 ++--- .../graph/impl/GraphManagerImpl.java | 6 +- .../graph/impl/stage/EdgeMetaRepairImpl.java | 2 + .../impl/stage/NodeDeleteListenerImpl.java | 2 +- .../impl/migration/EdgeDataMigrationImpl.java | 87 ++++----- .../graph/GraphManagerShardConsistencyIT.java | 2 +- .../usergrid/persistence/graph/SimpleTest.java | 12 +- .../migration/EdgeDataMigrationImplTest.java | 2 +- stack/corepersistence/pom.xml | 8 +- .../index/impl/IndexLoadTestsIT.java | 105 ++++------ stack/pom.xml | 8 +- .../management/importer/ImportServiceImpl.java | 34 ++-- .../impl/ApplicationQueueManagerImpl.java | 195 +++++++++---------- .../setup/ConcurrentProcessSingleton.java | 16 +- 34 files changed, 604 insertions(+), 826 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/pom.xml ---------------------------------------------------------------------- diff --git a/stack/core/pom.xml b/stack/core/pom.xml index 971ee62..119a52b 100644 --- a/stack/core/pom.xml +++ b/stack/core/pom.xml @@ -130,15 +130,7 @@ </execution> </executions> </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - <version>2.3.2</version> - <configuration> - <source>1.7</source> - <target>1.7</target> - </configuration> - </plugin> + <!-- <plugin> <groupId>org.apache.maven.plugins</groupId> @@ -481,17 +473,11 @@ <version>${metrics.version}</version> </dependency> - <dependency> - <groupId>com.netflix.rxjava</groupId> - <artifactId>rxjava-core</artifactId> - <version>${rx.version}</version> - </dependency> - - <dependency> - <groupId>com.netflix.rxjava</groupId> - <artifactId>rxjava-math</artifactId> - <version>${rx.version}</version> - </dependency> + <dependency> + <groupId>io.reactivex</groupId> + <artifactId>rxjava</artifactId> + <version>${rx.version}</version> + </dependency> <dependency> <groupId>com.clearspring.analytics</groupId> http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 789e640..9cffdaf 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -1097,7 +1097,7 @@ public class CpEntityManager implements EntityManager { } ); //TODO: does this call and others like it need a graphite reporter? - cpEntity = ecm.write( cpEntity ).toBlockingObservable().last(); + cpEntity = ecm.write( cpEntity ).toBlocking().last(); logger.debug( "Wrote {}:{} version {}", new Object[] { cpEntity.getId().getType(), cpEntity.getId().getUuid(), cpEntity.getVersion() http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index f76b9fc..83c3d85 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -451,7 +451,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application fromEntityId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, null )); - Iterator<Edge> iter = edges.toBlockingObservable().getIterator(); + //TODO This is wrong, and will result in OOM if there are too many applications. This needs to stream properly with a buffer + Iterator<Edge> iter = edges.toBlocking().getIterator(); while ( iter.hasNext() ) { Edge edge = iter.next(); @@ -469,7 +470,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application org.apache.usergrid.persistence.model.entity.Entity e = managerCache.getEntityCollectionManager( collScope ).load( targetId ) - .toBlockingObservable().lastOrDefault(null); + .toBlocking().lastOrDefault(null); if ( e == null ) { logger.warn("Applicaion {} in index but not found in collections", targetId ); @@ -624,7 +625,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application public long performEntityCount() { //TODO, this really needs to be a task that writes this data somewhere since this will get //progressively slower as the system expands - return (Long) getAllEntitiesObservable().longCount().toBlocking().last(); + return (Long) getAllEntitiesObservable().countLong().toBlocking().last(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index 2eeee28..c4e970d 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -280,7 +280,7 @@ public class CpRelationManager implements RelationManager { Observable<String> types= gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeTypePrefix, null )); - Iterator<String> iter = types.toBlockingObservable().getIterator(); + Iterator<String> iter = types.toBlocking().getIterator(); while ( iter.hasNext() ) { indexes.add( iter.next() ); } @@ -346,7 +346,7 @@ public class CpRelationManager implements RelationManager { Observable<Edge> edges = gm.loadEdgesToTarget( new SimpleSearchByEdgeType( cpHeadEntity.getId(), etype, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, null )); - Iterator<Edge> iter = edges.toBlockingObservable().getIterator(); + Iterator<Edge> iter = edges.toBlocking().getIterator(); while ( iter.hasNext() ) { Edge edge = iter.next(); @@ -383,7 +383,7 @@ public class CpRelationManager implements RelationManager { final GraphManager gm = managerCache.getGraphManager( applicationScope ); Iterator<String> edgeTypesToTarget = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( - cpHeadEntity.getId(), null, null) ).toBlockingObservable().getIterator(); + cpHeadEntity.getId(), null, null) ).toBlocking().getIterator(); logger.debug("updateContainingCollectionsAndCollections(): " + "Searched for edges to target {}:{}\n in scope {}\n found: {}", @@ -484,7 +484,7 @@ public class CpRelationManager implements RelationManager { SearchByEdgeType.Order.DESCENDING, null ) ); - return edges.toBlockingObservable().firstOrDefault( null ) != null; + return edges.toBlocking().firstOrDefault( null ) != null; } @@ -511,7 +511,7 @@ public class CpRelationManager implements RelationManager { SearchByEdgeType.Order.DESCENDING, null ) ); - return edges.toBlockingObservable().firstOrDefault( null ) != null; + return edges.toBlocking().firstOrDefault( null ) != null; } @@ -528,7 +528,7 @@ public class CpRelationManager implements RelationManager { SearchByEdgeType.Order.DESCENDING, null ) ); // last - Iterator<Edge> iterator = edgesToTarget.toBlockingObservable().getIterator(); + Iterator<Edge> iterator = edgesToTarget.toBlocking().getIterator(); int count = 0; while ( iterator.hasNext() ) { iterator.next(); @@ -569,7 +569,7 @@ public class CpRelationManager implements RelationManager { Observable<String> str = gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( cpHeadEntity.getId(), null, null ) ); - Iterator<String> iter = str.toBlockingObservable().getIterator(); + Iterator<String> iter = str.toBlocking().getIterator(); while ( iter.hasNext() ) { String edgeType = iter.next(); indexes.add( CpNamingUtils.getCollectionName( edgeType ) ); @@ -692,7 +692,7 @@ public class CpRelationManager implements RelationManager { // create graph edge connection from head entity to member entity Edge edge = new SimpleEdge( cpHeadEntity.getId(), edgeType, memberEntity.getId(), uuidHash ); GraphManager gm = managerCache.getGraphManager( applicationScope ); - gm.writeEdge( edge ).toBlockingObservable().last(); + gm.writeEdge( edge ).toBlocking().last(); logger.debug( "Wrote edgeType {}\n from {}:{}\n to {}:{}\n scope {}:{}", new Object[] { @@ -855,7 +855,7 @@ public class CpRelationManager implements RelationManager { cpHeadEntity.getId(), CpNamingUtils.getEdgeTypeFromCollectionName( collName ), memberEntity.getId(), UUIDUtils.getUUIDLong( memberEntity.getId().getUuid() ) ); - gm.deleteEdge( collectionToItemEdge ).toBlockingObservable().last(); + gm.deleteEdge( collectionToItemEdge ).toBlocking().last(); // remove edge from item to collection Edge itemToCollectionEdge = new SimpleEdge( @@ -865,7 +865,7 @@ public class CpRelationManager implements RelationManager { cpHeadEntity.getId(), UUIDUtils.getUUIDLong( cpHeadEntity.getId().getUuid() ) ); - gm.deleteEdge( itemToCollectionEdge ).toBlockingObservable().last(); + gm.deleteEdge( itemToCollectionEdge ).toBlocking().last(); // special handling for roles collection of a group if ( headEntity.getType().equals( Group.ENTITY_TYPE ) ) { @@ -1058,7 +1058,7 @@ public class CpRelationManager implements RelationManager { cpHeadEntity.getId(), edgeType, targetEntity.getId(), System.currentTimeMillis() ); GraphManager gm = managerCache.getGraphManager( applicationScope ); - gm.writeEdge( edge ).toBlockingObservable().last(); + gm.writeEdge( edge ).toBlocking().last(); EntityIndex ei = managerCache.getEntityIndex( applicationScope ); EntityIndexBatch batch = ei.createBatch(); @@ -1290,7 +1290,7 @@ public class CpRelationManager implements RelationManager { System.currentTimeMillis() ); GraphManager gm = managerCache.getGraphManager( applicationScope ); - gm.deleteEdge( edge ).toBlockingObservable().last(); + gm.deleteEdge( edge ).toBlocking().last(); final EntityIndex ei = managerCache.getEntityIndex( applicationScope ); final EntityIndexBatch batch = ei.createBatch(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java index 4b902d8..332d5a8 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java @@ -104,53 +104,38 @@ public class CpWalker { Observable<String> edgeTypes = gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( applicationId, edgeType, null ) ); - edgeTypes.flatMap( new Func1<String, Observable<Edge>>() { - @Override - public Observable<Edge> call( final String edgeType ) { - - logger.debug( "Loading edges of type {} from node {}", edgeType, applicationId ); - - return gm.loadEdgesFromSource( new SimpleSearchByEdgeType( - applicationId, edgeType, Long.MAX_VALUE, order , null ) ); - - } - - } ).parallel( new Func1<Observable<Edge>, Observable<Edge>>() { - - @Override - public Observable<Edge> call( final Observable<Edge> edgeObservable ) { // process edges in parallel - return edgeObservable.doOnNext( new Action1<Edge>() { // visit and update then entity - - @Override - public void call( Edge edge ) { - - logger.info( "Re-indexing edge {}", edge ); - - EntityRef targetNodeEntityRef = new SimpleEntityRef( - edge.getTargetNode().getType(), - edge.getTargetNode().getUuid() ); - - Entity entity; - try { - entity = em.get( targetNodeEntityRef ); - } - catch ( Exception ex ) { - logger.error( "Error getting sourceEntity {}:{}, continuing", - targetNodeEntityRef.getType(), - targetNodeEntityRef.getUuid() ); - return; - } - if(entity == null){ - return; - } - String collName = CpNamingUtils.getCollectionName( edge.getType() ); - visitor.visitCollectionEntry( em, collName, entity ); - } - } ); - } - }, Schedulers.io() ) + edgeTypes.flatMap( emittedEdgeType -> { + + logger.debug( "Loading edges of type {} from node {}", edgeType, applicationId ); + + return gm.loadEdgesFromSource( + new SimpleSearchByEdgeType( applicationId, emittedEdgeType, Long.MAX_VALUE, order, null ) ); + } ).flatMap( edge -> { + //run each edge through it's own scheduler, up to 100 at a time + return Observable.just( edge ).doOnNext( edgeValue -> { + logger.info( "Re-indexing edge {}", edgeValue ); + + EntityRef targetNodeEntityRef = + new SimpleEntityRef( edgeValue.getTargetNode().getType(), edgeValue.getTargetNode().getUuid() ); + + Entity entity; + try { + entity = em.get( targetNodeEntityRef ); + } + catch ( Exception ex ) { + logger.error( "Error getting sourceEntity {}:{}, continuing", targetNodeEntityRef.getType(), + targetNodeEntityRef.getUuid() ); + return; + } + if ( entity == null ) { + return; + } + String collName = CpNamingUtils.getCollectionName( edgeValue.getType() ); + visitor.visitCollectionEntry( em, collName, entity ); + } ).subscribeOn( Schedulers.io() ); + }, 100 ); // wait for it to complete - .toBlocking().lastOrDefault( null ); // end foreach on edges + edgeTypes.toBlocking().lastOrDefault( null ); // end foreach on edges } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java index c45949b..23f5a32 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java @@ -17,53 +17,48 @@ */ package org.apache.usergrid.corepersistence.events; -import com.google.inject.Inject; -import com.google.inject.Singleton; import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.usergrid.corepersistence.CpEntityManagerFactory; -import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED; import org.apache.usergrid.persistence.EntityManagerFactory; import org.apache.usergrid.persistence.collection.CollectionScope; -import org.apache.usergrid.persistence.collection.MvccEntity; import org.apache.usergrid.persistence.collection.MvccLogEntry; import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted; -import org.apache.usergrid.persistence.collection.serialization.SerializationFig; import org.apache.usergrid.persistence.index.EntityIndex; -import org.apache.usergrid.persistence.index.EntityIndexBatch; import org.apache.usergrid.persistence.index.IndexScope; import org.apache.usergrid.persistence.index.impl.IndexScopeImpl; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.SimpleId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import com.google.inject.Singleton; import rx.Observable; -import rx.functions.Action1; -import rx.functions.Action2; -import rx.functions.Func1; -import rx.schedulers.Schedulers; + +import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED; /** - * Remove Entity index when specific version of Entity is deleted. - * TODO: do we need this? Don't our version-created and entity-deleted handlers take care of this? - * If we do need it then it should be wired in via GuiceModule in the corepersistence package. + * Remove Entity index when specific version of Entity is deleted. TODO: do we need this? Don't our version-created and + * entity-deleted handlers take care of this? If we do need it then it should be wired in via GuiceModule in the + * corepersistence package. */ @Singleton public class EntityVersionDeletedHandler implements EntityVersionDeleted { - private static final Logger logger = LoggerFactory.getLogger(EntityVersionDeletedHandler.class ); - - + private static final Logger logger = LoggerFactory.getLogger( EntityVersionDeletedHandler.class ); private final EntityManagerFactory emf; + @Inject public EntityVersionDeletedHandler( final EntityManagerFactory emf ) {this.emf = emf;} - @Override public void versionDeleted( final CollectionScope scope, final Id entityId, final List<MvccLogEntry> entityVersions ) { @@ -71,40 +66,33 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted { // This check is for testing purposes and for a test that to be able to dynamically turn // off and on delete previous versions so that it can test clean-up on read. - if ( System.getProperty( EVENTS_DISABLED, "false" ).equals( "true" )) { + if ( System.getProperty( EVENTS_DISABLED, "false" ).equals( "true" ) ) { return; } - if(logger.isDebugEnabled()) { - logger.debug( "Handling versionDeleted count={} event for entity {}:{} v {} " + "scope\n name: {}\n owner: {}\n app: {}", - new Object[] { + if ( logger.isDebugEnabled() ) { + logger.debug( "Handling versionDeleted count={} event for entity {}:{} v {} " + + "scope\n name: {}\n owner: {}\n app: {}", new Object[] { entityVersions.size(), entityId.getType(), entityId.getUuid(), scope.getName(), scope.getOwner(), scope.getApplication() } ); } - CpEntityManagerFactory cpemf = (CpEntityManagerFactory)emf; + CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf; final EntityIndex ei = cpemf.getManagerCache().getEntityIndex( scope ); - final IndexScope indexScope = new IndexScopeImpl( - new SimpleId(scope.getOwner().getUuid(), scope.getOwner().getType()), - scope.getName() - ); - - Observable.from( entityVersions ) - .collect( ei.createBatch(), new Action2<EntityIndexBatch, MvccLogEntry>() { - @Override - public void call( final EntityIndexBatch entityIndexBatch, final MvccLogEntry mvccLogEntry ) { - entityIndexBatch.deindex( indexScope, mvccLogEntry.getEntityId(), mvccLogEntry.getVersion() ); - } - } ).doOnNext( new Action1<EntityIndexBatch>() { - @Override - public void call( final EntityIndexBatch entityIndexBatch ) { + final IndexScope indexScope = + new IndexScopeImpl( new SimpleId( scope.getOwner().getUuid(), scope.getOwner().getType() ), + scope.getName() ); + + //create our batch, and then collect all of them into a single batch + Observable.from( entityVersions ).collect( () -> ei.createBatch(), ( entityIndexBatch, mvccLogEntry ) -> { + entityIndexBatch.deindex( indexScope, mvccLogEntry.getEntityId(), mvccLogEntry.getVersion() ); + } ) + //after our batch is collected, execute it + .doOnNext( entityIndexBatch -> { entityIndexBatch.execute(); - } - } ).toBlocking().last(); + } ).toBlocking().last(); } - - } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java index 40ad236..6531d16 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java @@ -37,6 +37,7 @@ import com.google.inject.Inject; import rx.Observable; import rx.functions.Action1; import rx.functions.Func1; +import rx.schedulers.Schedulers; /** @@ -63,36 +64,26 @@ public class EntityTypeMappingMigration implements DataMigration<EntityIdScope> final AtomicLong atomicLong = new AtomicLong(); - allEntitiesInSystemObservable.getData() - //process the entities in parallel - .parallel( new Func1<Observable<EntityIdScope>, Observable<EntityIdScope>>() { + //migrate up to 100 types simultaneously + allEntitiesInSystemObservable.getData().flatMap( entityIdScope -> { + return Observable.just( entityIdScope ).doOnNext( entityIdScopeObservable -> { + final MapScope ms = CpNamingUtils + .getEntityTypeMapScope( entityIdScope.getCollectionScope().getApplication() ); + final MapManager mapManager = managerCache.getMapManager( ms ); - @Override - public Observable<EntityIdScope> call( final Observable<EntityIdScope> entityIdScopeObservable ) { + final UUID entityUuid = entityIdScope.getId().getUuid(); + final String entityType = entityIdScope.getId().getType(); - //for each entity observable, get the map scope and write it to the map - return entityIdScopeObservable.doOnNext( new Action1<EntityIdScope>() { - @Override - public void call( final EntityIdScope entityIdScope ) { - final MapScope ms = CpNamingUtils - .getEntityTypeMapScope( entityIdScope.getCollectionScope().getApplication() ); + mapManager.putString( entityUuid.toString(), entityType ); - final MapManager mapManager = managerCache.getMapManager( ms ); + if ( atomicLong.incrementAndGet() % 100 == 0 ) { + observer.update( getMaxVersion(), + String.format( "Updated %d entities", atomicLong.get() ) ); + } - final UUID entityUuid = entityIdScope.getId().getUuid(); - final String entityType = entityIdScope.getId().getType(); - - mapManager.putString( entityUuid.toString(), entityType ); - - if ( atomicLong.incrementAndGet() % 100 == 0 ) { - observer.update( getMaxVersion(), - String.format( "Updated %d entities", atomicLong.get() ) ); - } - } - } ); - } - } ).count().toBlocking().last(); + } ).subscribeOn( Schedulers.io() ); + }, 100 ).count().toBlocking().last(); return getMaxVersion(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java index be7cee4..88f56c8 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java @@ -77,7 +77,7 @@ public class EntityTypeMappingMigrationIT { final MapScope mapScope2 = new MapScopeImpl(applicationId, CpNamingUtils.TYPES_BY_UUID_MAP ); - final Observable<EntityIdScope> scopes = Observable.from(idScope1, idScope2); + final Observable<EntityIdScope> scopes = Observable.just(idScope1, idScope2); final TestMigrationDataProvider<EntityIdScope> migrationDataProvider = new TestMigrationDataProvider<>(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java index f565fab..70b5a3a 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java @@ -235,7 +235,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" ); final Timer.Context timer = deleteTimer.time(); - Observable<Id> o = Observable.from(new CollectionIoEvent<Id>(collectionScope, entityId)) + Observable<Id> o = Observable.just( new CollectionIoEvent<Id>( collectionScope, entityId ) ) .map(markStart) .doOnNext( markCommit ) .map(new Func1<CollectionIoEvent<MvccEntity>, Id>() { @@ -284,7 +284,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { return Observable.empty(); } - return Observable.from(entity.getEntity().get()); + return Observable.just( entity.getEntity().get() ); } }) .doOnNext( new Action1<Entity>() { @@ -449,19 +449,19 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData, WriteStart writeState ) { - return Observable.from( writeData ).map( writeState ).doOnNext( new Action1<CollectionIoEvent<MvccEntity>>() { + return Observable.just( writeData ).map( writeState ).doOnNext( new Action1<CollectionIoEvent<MvccEntity>>() { @Override public void call( final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) { Observable<CollectionIoEvent<MvccEntity>> unique = - Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() ) + Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() ) .doOnNext( writeVerifyUnique ); // optimistic verification Observable<CollectionIoEvent<MvccEntity>> optimistic = - Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() ) + Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() ) .doOnNext( writeOptimisticVerify ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java index 5472645..7620907 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java @@ -127,22 +127,10 @@ public class EntityDeletedTask implements Task<Void> { LOG.debug( "Started firing {} listeners", listenerSize ); - //if we have more than 1, run them on the rx scheduler for a max of 8 operations at a time - Observable.from(listeners) - .parallel( new Func1<Observable<EntityDeleted>, Observable<EntityDeleted>>() { - - @Override - public Observable<EntityDeleted> call( - final Observable<EntityDeleted> entityVersionDeletedObservable ) { - - return entityVersionDeletedObservable.doOnNext( new Action1<EntityDeleted>() { - @Override - public void call( final EntityDeleted listener ) { - listener.deleted(collectionScope, entityId, version); - } - } ); - } - }, Schedulers.io() ).toBlocking().last(); + //if we have more than 1, run them on the rx scheduler for a max of 10 operations at a time + Observable.from(listeners).flatMap( currentListener -> Observable.just( currentListener ).doOnNext( listener -> { + listener.deleted( collectionScope, entityId, version ); + } ).subscribeOn( Schedulers.io() ), 10 ).toBlocking().last(); LOG.debug( "Finished firing {} listeners", listenerSize ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java index b245528..1a7b86b 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java @@ -159,7 +159,7 @@ public class EntityVersionCleanupTask implements Task<Void> { throw new RuntimeException( "Unable to execute batch mutation", e ); } } - } ).subscribeOn( Schedulers.io() ).longCount().toBlocking(); + } ).subscribeOn( Schedulers.io() ).countLong().toBlocking(); //start calling the listeners for remove log entries @@ -201,7 +201,7 @@ public class EntityVersionCleanupTask implements Task<Void> { throw new RuntimeException( "Unable to execute batch mutation", e ); } } - } ).subscribeOn( Schedulers.io() ).longCount().toBlocking(); + } ).subscribeOn( Schedulers.io() ).countLong().toBlocking(); //wait or this to complete final Long removedCount = uniqueValueCleanup.last(); @@ -232,21 +232,14 @@ public class EntityVersionCleanupTask implements Task<Void> { logger.debug( "Started firing {} listeners", listenerSize ); //if we have more than 1, run them on the rx scheduler for a max of 8 operations at a time - Observable.from( listeners ) - .parallel( new Func1<Observable<EntityVersionDeleted>, Observable<EntityVersionDeleted>>() { - - @Override - public Observable<EntityVersionDeleted> call( - final Observable<EntityVersionDeleted> entityVersionDeletedObservable ) { - - return entityVersionDeletedObservable.doOnNext( new Action1<EntityVersionDeleted>() { - @Override - public void call( final EntityVersionDeleted listener ) { - listener.versionDeleted( scope, entityId, versions ); - } - } ); - } - }, Schedulers.io() ).toBlocking().last(); + + + //if we have more than 1, run them on the rx scheduler for a max of 10 operations at a time + Observable.from(listeners).flatMap( currentListener -> Observable.just( currentListener ).doOnNext( listener -> { + listener.versionDeleted( scope, entityId, versions ); + } ).subscribeOn( Schedulers.io() ), 10 ).toBlocking().last(); + + logger.debug( "Finished firing {} listeners", listenerSize ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java index 7d3beb1..16a6e77 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java @@ -64,7 +64,7 @@ public class EntityVersionCreatedTask implements Task<Void> { @Override public Void rejected() { - // Our task was rejected meaning our queue was full. + // Our task was rejected meaning our queue was full. // We need this operation to run, so we'll run it in our current thread try { call(); @@ -76,7 +76,7 @@ public class EntityVersionCreatedTask implements Task<Void> { return null; } - + @Override public Void call() throws Exception { @@ -100,22 +100,12 @@ public class EntityVersionCreatedTask implements Task<Void> { logger.debug( "Started firing {} listeners", listenerSize ); - //if we have more than 1, run them on the rx scheduler for a max of 8 operations at a time - Observable.from(listeners).parallel( - new Func1<Observable<EntityVersionCreated>, Observable<EntityVersionCreated>>() { - - @Override - public Observable<EntityVersionCreated> call( - final Observable<EntityVersionCreated> entityVersionCreatedObservable ) { - - return entityVersionCreatedObservable.doOnNext( new Action1<EntityVersionCreated>() { - @Override - public void call( final EntityVersionCreated listener ) { - listener.versionCreated(collectionScope,entity); - } - } ); - } - }, Schedulers.io() ).toBlocking().last(); + + Observable.from( listeners ) + .flatMap( currentListener -> Observable.just( currentListener ).doOnNext( listener -> { + listener.versionCreated( collectionScope, entity ); + } ).subscribeOn( Schedulers.io() ), 10 ).toBlocking().last(); + logger.debug( "Finished firing {} listeners", listenerSize ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java index e1445e3..ad1d91a 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java @@ -176,77 +176,52 @@ public abstract class MvccEntitySerializationStrategyImpl implements MvccEntityS } - final EntitySetImpl entitySetResults = Observable.from( rowKeys ) - //buffer our entities per request, then for that buffer, execute the query in parallel (if neccessary) - .buffer(entitiesPerRequest ) - .parallel( new Func1<Observable<List<ScopedRowKey - <CollectionPrefixedKey<Id>>>>, Observable<Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>>>() { + final EntitySetImpl entitySetResults = Observable.from( rowKeys ) + //buffer our entities per request, then for that buffer, execute the query in parallel (if neccessary) + .buffer( entitiesPerRequest ).flatMap( listObservable -> { - @Override - public Observable<Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>> call( - final Observable<List<ScopedRowKey<CollectionPrefixedKey<Id>>>> listObservable ) { - - - //here, we execute our query then emit the items either in parallel, or on the current thread if we have more than 1 request - return listObservable.map( new Func1<List<ScopedRowKey<CollectionPrefixedKey<Id>>>, - Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>>() { - + //here, we execute our query then emit the items either in parallel, or on the current thread + // if we have more than 1 request + return Observable.just( listObservable ).map( scopedRowKeys -> { - @Override - public Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> call( - final List<ScopedRowKey<CollectionPrefixedKey<Id>>> scopedRowKeys ) { - - try { - return keyspace.prepareQuery( columnFamily ).getKeySlice( rowKeys ) - .withColumnRange( maxVersion, null, false, - 1 ).execute().getResult(); - } - catch ( ConnectionException e ) { - throw new CollectionRuntimeException( null, collectionScope, "An error occurred connecting to cassandra", - e ); - } + try { + return keyspace.prepareQuery( columnFamily ).getKeySlice( rowKeys ) + .withColumnRange( maxVersion, null, false, 1 ).execute().getResult(); } - } ); - - - - } - }, scheduler ) - - //reduce all the output into a single Entity set - .reduce( new EntitySetImpl( entityIds.size() ), - new Func2<EntitySetImpl, Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>, EntitySetImpl>() { - @Override - public EntitySetImpl call( final EntitySetImpl entitySet, - final Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> rows ) { - - final Iterator<Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>> latestEntityColumns = rows.iterator(); + catch ( ConnectionException e ) { + throw new CollectionRuntimeException( null, collectionScope, + "An error occurred connecting to cassandra", e ); + } + } ).subscribeOn( scheduler ); + }, 10 ) - while ( latestEntityColumns.hasNext() ) { - final Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> row = latestEntityColumns.next(); + .reduce( new EntitySetImpl( entityIds.size() ), ( entitySet, rows ) -> { + final Iterator<Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>> latestEntityColumns = + rows.iterator(); - final ColumnList<UUID> columns = row.getColumns(); + while ( latestEntityColumns.hasNext() ) { + final Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> row = latestEntityColumns.next(); - if ( columns.size() == 0 ) { - continue; - } + final ColumnList<UUID> columns = row.getColumns(); - final Id entityId = row.getKey().getKey().getSubKey(); + if ( columns.size() == 0 ) { + continue; + } - final Column<UUID> column = columns.getColumnByIndex( 0 ); + final Id entityId = row.getKey().getKey().getSubKey(); - final MvccEntity parsedEntity = - new MvccColumnParser( entityId, getEntitySerializer() ).parseColumn( column ); + final Column<UUID> column = columns.getColumnByIndex( 0 ); - entitySet.addEntity( parsedEntity ); - } + final MvccEntity parsedEntity = + new MvccColumnParser( entityId, getEntitySerializer() ).parseColumn( column ); + entitySet.addEntity( parsedEntity ); + } - return entitySet; - } - } ).toBlocking().last(); + return entitySet; + } ).toBlocking().last(); return entitySetResults; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java index a5046f6..de959b5 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java @@ -185,82 +185,55 @@ public class MvccEntitySerializationStrategyV3Impl implements MvccEntitySerializ final EntitySetImpl entitySetResults = Observable.from( rowKeys ) - //buffer our entities per request, then for that buffer, execute the query in parallel (if neccessary) - .buffer( entitiesPerRequest ).parallel( - new Func1<Observable<List<ScopedRowKey<CollectionPrefixedKey<Id>>>>, - Observable<Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean>>>() { + //buffer our entities per request, then for that buffer, execute the query in parallel (if neccessary) + .buffer( entitiesPerRequest ).flatMap( listObservable -> { - @Override - public Observable<Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean>> call( - final Observable<List<ScopedRowKey<CollectionPrefixedKey<Id>>>> listObservable ) { + //here, we execute our query then emit the items either in parallel, or on the current thread + // if we have more than 1 request + return Observable.just( listObservable ).map( scopedRowKeys -> { - //here, we execute our query then emit - // the items either in parallel, or on - // the current thread if we have more - // than 1 request - return listObservable - .map( new Func1<List<ScopedRowKey<CollectionPrefixedKey<Id>>>, - Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean>>() { + try { + return keyspace.prepareQuery( CF_ENTITY_DATA ).getKeySlice( rowKeys ) + .withColumnSlice( COL_VALUE ).execute().getResult(); + } + catch ( ConnectionException e ) { + throw new CollectionRuntimeException( null, collectionScope, + "An error occurred connecting to cassandra", e ); + } + } ).subscribeOn( scheduler ); + }, 10 ) + .reduce( new EntitySetImpl( entityIds.size() ), ( entitySet, rows ) -> { + final Iterator<Row<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean>> latestEntityColumns = + rows.iterator(); - @Override - public Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean> call( - final List<ScopedRowKey<CollectionPrefixedKey<Id>>> scopedRowKeys - ) { + while ( latestEntityColumns.hasNext() ) { + final Row<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean> row = latestEntityColumns.next(); - try { - return keyspace.prepareQuery( CF_ENTITY_DATA ) - .getKeySlice( rowKeys ) - .withColumnSlice( COL_VALUE ) - .execute().getResult(); - } - catch ( ConnectionException e ) { - throw new CollectionRuntimeException( null, collectionScope, - "An error occurred connecting to cassandra", e ); - } - } - } ); - } - }, scheduler ) + final ColumnList<Boolean> columns = row.getColumns(); - //reduce all the output into a single Entity set - .reduce( new EntitySetImpl( entityIds.size() ), - new Func2<EntitySetImpl, Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean>, EntitySetImpl>() { - @Override - public EntitySetImpl call( final EntitySetImpl entitySet, - final Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean> rows - ) { + if ( columns.size() == 0 ) { + continue; + } - final Iterator<Row<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean>> latestEntityColumns = - rows.iterator(); + final Id entityId = row.getKey().getKey().getSubKey(); - while ( latestEntityColumns.hasNext() ) { - final Row<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean> row = - latestEntityColumns.next(); + final Column<Boolean> column = columns.getColumnByIndex( 0 ); - final ColumnList<Boolean> columns = row.getColumns(); + final MvccEntity parsedEntity = + new MvccColumnParser( entityId, entitySerializer ).parseColumn( column ); - if ( columns.size() == 0 ) { - continue; - } - final Id entityId = row.getKey().getKey().getSubKey(); + entitySet.addEntity( parsedEntity ); + } - final Column<Boolean> column = columns.getColumnByIndex( 0 ); - final MvccEntity parsedEntity = - new MvccColumnParser( entityId, entitySerializer ).parseColumn( column ); + return entitySet; + } ).toBlocking().last(); - entitySet.addEntity( parsedEntity ); - } - - - return entitySet; - } - } ).toBlocking().last(); return entitySetResults; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java index f87b5fd..6982857 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java @@ -124,130 +124,107 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope> final Observable<List<EntityToSaveMessage>> migrated = - migrationDataProvider.getData().subscribeOn( Schedulers.io() ).parallel( - new Func1<Observable<EntityIdScope>, Observable<List<EntityToSaveMessage>>>() { + migrationDataProvider.getData().subscribeOn( Schedulers.io() ).flatMap( entityToSaveList -> Observable.just( entityToSaveList ).flatMap( entityIdScope -> { + //load the entity + final CollectionScope currentScope = entityIdScope.getCollectionScope(); - //process the ids in parallel - @Override - public Observable<List<EntityToSaveMessage>> call( - final Observable<EntityIdScope> entityIdScopeObservable ) { - - - return entityIdScopeObservable.flatMap( - new Func1<EntityIdScope, Observable<EntityToSaveMessage>>() { - - - @Override - public Observable<EntityToSaveMessage> call( final EntityIdScope entityIdScope ) { - //load the entity - final CollectionScope currentScope = entityIdScope.getCollectionScope(); + //for each element in our + // history, we need to copy it + // to v2. + // Note that + // this migration + //won't support anything beyond V2 + final Iterator<MvccEntity> allVersions = + migration.from.loadAscendingHistory( currentScope, entityIdScope.getId(), startTime, 100 ); - //for each element in our - // history, we need to copy it - // to v2. - // Note that - // this migration - //won't support anything beyond V2 - - final Iterator<MvccEntity> allVersions = migration.from - .loadAscendingHistory( currentScope, entityIdScope.getId(), startTime, 100 ); + //emit all the entity versions + return Observable.create( new Observable.OnSubscribe<EntityToSaveMessage>() { + @Override + public void call( final Subscriber<? super + EntityToSaveMessage> subscriber ) { - //emit all the entity versions - return Observable.create( new Observable.OnSubscribe<EntityToSaveMessage>() { - @Override - public void call( final Subscriber<? super - EntityToSaveMessage> subscriber ) { + while ( allVersions.hasNext() ) { + final EntityToSaveMessage message = + new EntityToSaveMessage( currentScope, allVersions.next() ); + subscriber.onNext( message ); + } - while ( allVersions.hasNext() ) { - final EntityToSaveMessage message = new EntityToSaveMessage( currentScope, allVersions.next() ); - subscriber.onNext( message ); - } + subscriber.onCompleted(); + } + } ).buffer( 100 ).doOnNext( entities -> { - subscriber.onCompleted(); - } - } ); - } - } ) - //buffer 10 versions - .buffer( 100 ).doOnNext( new Action1<List<EntityToSaveMessage>>() { - @Override - public void call( final List<EntityToSaveMessage> entities ) { + final MutationBatch totalBatch = keyspace.prepareMutationBatch(); - final MutationBatch totalBatch = keyspace.prepareMutationBatch(); + atomicLong.addAndGet( entities.size() ); - atomicLong.addAndGet( entities.size() ); + List<EntityVersionCleanupTask> entityVersionCleanupTasks = new ArrayList( entities.size() ); - List<EntityVersionCleanupTask> entityVersionCleanupTasks = new ArrayList(entities.size()); + for ( EntityToSaveMessage message : entities ) { + final MutationBatch entityRewrite = migration.to.write( message.scope, message.entity ); - for ( EntityToSaveMessage message : entities ) { - final MutationBatch entityRewrite = - migration.to.write( message.scope, message.entity ); + //add to + // the + // total + // batch + totalBatch.mergeShallow( entityRewrite ); - //add to - // the - // total - // batch - totalBatch.mergeShallow( entityRewrite ); + //write + // the + // unique values - //write - // the - // unique values + if ( !message.entity.getEntity().isPresent() ) { + return; + } - if ( !message.entity.getEntity().isPresent() ) { - return; - } + final Entity entity = message.entity.getEntity().get(); - final Entity entity = message.entity.getEntity().get(); + final Id entityId = entity.getId(); - final Id entityId = entity.getId(); + final UUID version = message.entity.getVersion(); - final UUID version = message.entity.getVersion(); + // re-write the unique + // values + // but this + // time with + // no TTL so that cleanup can clean up + // older values + for ( Field field : EntityUtils.getUniqueFields( message.entity.getEntity().get() ) ) { - // re-write the unique - // values - // but this - // time with - // no TTL so that cleanup can clean up - // older values - for ( Field field : EntityUtils - .getUniqueFields( message.entity.getEntity().get() ) ) { + UniqueValue written = new UniqueValueImpl( field, entityId, version ); - UniqueValue written = new UniqueValueImpl( field, entityId, version ); + MutationBatch mb = uniqueValueSerializationStrategy.write( message.scope, written ); - MutationBatch mb = - uniqueValueSerializationStrategy.write( message.scope, written ); + // merge into our + // existing mutation + // batch + totalBatch.mergeShallow( mb ); + } - // merge into our - // existing mutation - // batch - totalBatch.mergeShallow( mb ); - } + final EntityVersionCleanupTask task = entityVersionCleanupFactory + .getCleanupTask( message.scope, message.entity.getId(), version, false ); - final EntityVersionCleanupTask task = entityVersionCleanupFactory.getCleanupTask( message.scope, message.entity.getId(), version, false ); + entityVersionCleanupTasks.add( task ); + } - entityVersionCleanupTasks.add( task ); - } + executeBatch( migration.to.getImplementationVersion(), totalBatch, observer, atomicLong ); - executeBatch( migration.to.getImplementationVersion(), totalBatch, observer, atomicLong ); + //now run our cleanup task - //now run our cleanup task + for ( EntityVersionCleanupTask entityVersionCleanupTask : entityVersionCleanupTasks ) { + try { + entityVersionCleanupTask.call(); + } + catch ( Exception e ) { + LOGGER.error( "Unable to run cleanup task", e ); + } + } + } ).subscribeOn( Schedulers.io() ); - for(EntityVersionCleanupTask entityVersionCleanupTask: entityVersionCleanupTasks){ - try { - entityVersionCleanupTask.call(); - } - catch ( Exception e ) { - LOGGER.error( "Unable to run cleanup task", e ); - } - } - } - } ); - } - } ); + }, 10) ); migrated.toBlocking().lastOrDefault(null); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java index 2d416a4..a49e533 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java @@ -64,7 +64,7 @@ public class ParallelTest { final int expected = size - 1; - // QUESTION Using this thread blocks indefinitely. The execution of the Hystrix command + // QUESTION Using this thread blocks indefinitely. The execution of the Hystrix command // happens on the computation Thread if this is used // final Scheduler scheduler = Schedulers.threadPoolForComputation(); @@ -90,7 +90,7 @@ public class ParallelTest { * non blocking? */ - final Observable<String> observable = Observable.from( input ).observeOn( Schedulers.io() ); + final Observable<String> observable = Observable.just( input ).observeOn( Schedulers.io() ); Observable<Integer> thing = observable.flatMap( new Func1<String, Observable<Integer>>() { @@ -99,7 +99,7 @@ public class ParallelTest { public Observable<Integer> call( final String s ) { List<Observable<Integer>> functions = new ArrayList<Observable<Integer>>(); - logger.info( "Creating new set of observables in thread {}", + logger.info( "Creating new set of observables in thread {}", Thread.currentThread().getName() ); for ( int i = 0; i < size; i++ ) { @@ -107,13 +107,13 @@ public class ParallelTest { final int index = i; - // create a new observable and execute the function on it. + // create a new observable and execute the function on it. // These should happen in parallel when a subscription occurs /** * QUESTION: Should this again be the process thread, not the I/O */ - Observable<String> newObservable = Observable.from( input ).subscribeOn( Schedulers.io() ); + Observable<String> newObservable = Observable.just( input ).subscribeOn( Schedulers.io() ); Observable<Integer> transformed = newObservable.map( new Func1<String, Integer>() { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/AbstractMvccEntityDataMigrationV1ToV3ImplTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/AbstractMvccEntityDataMigrationV1ToV3ImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/AbstractMvccEntityDataMigrationV1ToV3ImplTest.java index 747ea7b..9938caf 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/AbstractMvccEntityDataMigrationV1ToV3ImplTest.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/AbstractMvccEntityDataMigrationV1ToV3ImplTest.java @@ -119,7 +119,7 @@ public abstract class AbstractMvccEntityDataMigrationV1ToV3ImplTest implements D assertEquals( "Same entity", entity2, returned2 ); final Observable<EntityIdScope> entityIdScope = - Observable.from( new EntityIdScope( scope, entity1.getId() ), new EntityIdScope( scope, entity2.getId() ) ); + Observable.just( new EntityIdScope( scope, entity1.getId() ), new EntityIdScope( scope, entity2.getId() ) ); final MigrationDataProvider<EntityIdScope> migrationProvider = new MigrationDataProvider<EntityIdScope>() { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/common/pom.xml ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/pom.xml b/stack/corepersistence/common/pom.xml index 82df1d8..2be1c1a 100644 --- a/stack/corepersistence/common/pom.xml +++ b/stack/corepersistence/common/pom.xml @@ -101,15 +101,16 @@ <!-- RX java --> + <dependency> + <groupId>io.reactivex</groupId> + <artifactId>rxjava</artifactId> + <version>${rx.version}</version> + </dependency> + <dependency> - <groupId>com.netflix.rxjava</groupId> - <artifactId>rxjava-core</artifactId> - <version>${rx.version}</version> - </dependency> - <dependency> - <groupId>com.netflix.rxjava</groupId> + <groupId>io.reactivex</groupId> <artifactId>rxjava-math</artifactId> - <version>${rx.version}</version> + <version>1.0.0</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java index 15f9aab..23661ee 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java @@ -77,7 +77,9 @@ public class MultiKeyColumnNameIterator<C, T> implements Iterable<T>, Iterator<T for ( ColumnNameIterator<C, T> columnNameIterator : columnNameIterators ) { - observables[i] = Observable.from( columnNameIterator, Schedulers.io() ); + + + observables[i] = Observable.from( columnNameIterator ).subscribeOn( Schedulers.io() ); i++; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java index f4f6f9c..3c56763 100644 --- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java +++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java @@ -52,6 +52,7 @@ import com.netflix.astyanax.util.RangeBuilder; import rx.Observable; import rx.functions.Action1; import rx.functions.Func1; +import rx.schedulers.Schedulers; import static org.junit.Assert.assertEquals; @@ -125,100 +126,95 @@ public class MultiKeyColumnNameIteratorTest { final long maxValue = 10000; + /** * Write to both rows in parallel */ Observable.from( new String[] { rowKey1, rowKey2, rowKey3 } ) - .parallel( new Func1<Observable<String>, Observable<String>>() { - @Override - public Observable<String> call( final Observable<String> stringObservable ) { - return stringObservable.doOnNext( new Action1<String>() { - @Override - public void call( final String key ) { - - final MutationBatch batch = keyspace.prepareMutationBatch(); - - for ( long i = 0; i < maxValue; i++ ) { - batch.withRow( COLUMN_FAMILY, key ).putColumn( i, TRUE ); - - if ( i % 1000 == 0 ) { - try { - batch.execute(); - } - catch ( ConnectionException e ) { - throw new RuntimeException( e ); - } - } - } - - try { - batch.execute(); - } - catch ( ConnectionException e ) { - throw new RuntimeException( e ); - } + //perform a flatmap + .flatMap( stringObservable -> Observable.just( stringObservable ).doOnNext( key -> { + final MutationBatch batch = keyspace.prepareMutationBatch(); + + for ( long i = 0; i < maxValue; i++ ) { + batch.withRow( COLUMN_FAMILY, key ).putColumn( i, TRUE ); + + if ( i % 1000 == 0 ) { + try { + batch.execute(); + } + catch ( ConnectionException e ) { + throw new RuntimeException( e ); } - } ); + } } - } ).toBlocking().last(); + try { + batch.execute(); + } + catch ( ConnectionException e ) { + throw new RuntimeException( e ); + } + } ).subscribeOn( Schedulers.io() ) ).toBlocking().last(); - //create 3 iterators - ColumnNameIterator<Long, Long> row1Iterator = createIterator( rowKey1, false ); - ColumnNameIterator<Long, Long> row2Iterator = createIterator( rowKey2, false ); - ColumnNameIterator<Long, Long> row3Iterator = createIterator( rowKey3, false ); - final Comparator<Long> ascendingComparator = new Comparator<Long>() { - @Override - public int compare( final Long o1, final Long o2 ) { - return Long.compare( o1, o2 ); - } - }; + //create 3 iterators - /** - * Again, arbitrary buffer size to attempt we buffer at some point - */ - final MultiKeyColumnNameIterator<Long, Long> ascendingItr = + ColumnNameIterator<Long, Long> row1Iterator = createIterator( rowKey1, false ); + ColumnNameIterator<Long, Long> row2Iterator = createIterator( rowKey2, false ); + ColumnNameIterator<Long, Long> row3Iterator = createIterator( rowKey3, false ); + + final Comparator<Long> ascendingComparator = new Comparator<Long>() { + + @Override + public int compare( final Long o1, final Long o2 ) { + return Long.compare( o1, o2 ); + } + }; + + /** + * Again, arbitrary buffer size to attempt we buffer at some point + */ + final MultiKeyColumnNameIterator<Long, Long> ascendingItr = new MultiKeyColumnNameIterator<>( Arrays.asList( row1Iterator, row2Iterator, row3Iterator ), - ascendingComparator, 900 ); + ascendingComparator, 900 ); - //ensure we have to make several trips, purposefully set to a nonsensical value to ensure we make all the - // trips required + //ensure we have to make several trips, purposefully set to a nonsensical value to ensure we make all the + // trips required - for ( long i = 0; i < maxValue; i++ ) { - assertEquals( i, ascendingItr.next().longValue() ); - } + for ( long i = 0; i < maxValue; i++ ) { + assertEquals( i, ascendingItr.next().longValue() ); + } - //now test it in reverse + //now test it in reverse - ColumnNameIterator<Long, Long> row1IteratorDesc = createIterator( rowKey1, true ); - ColumnNameIterator<Long, Long> row2IteratorDesc = createIterator( rowKey2, true ); - ColumnNameIterator<Long, Long> row3IteratorDesc = createIterator( rowKey3, true ); + ColumnNameIterator<Long, Long> row1IteratorDesc = createIterator( rowKey1, true ); + ColumnNameIterator<Long, Long> row2IteratorDesc = createIterator( rowKey2, true ); + ColumnNameIterator<Long, Long> row3IteratorDesc = createIterator( rowKey3, true ); - final Comparator<Long> descendingComparator = new Comparator<Long>() { + final Comparator<Long> descendingComparator = new Comparator<Long>() { - @Override - public int compare( final Long o1, final Long o2 ) { - return ascendingComparator.compare( o1, o2 ) * -1; - } - }; + @Override + public int compare( final Long o1, final Long o2 ) { + return ascendingComparator.compare( o1, o2 ) * -1; + } + }; - /** - * Again, arbitrary buffer size to attempt we buffer at some point - */ - final MultiKeyColumnNameIterator<Long, Long> descendingItr = + /** + * Again, arbitrary buffer size to attempt we buffer at some point + */ + final MultiKeyColumnNameIterator<Long, Long> descendingItr = new MultiKeyColumnNameIterator<>( Arrays.asList( row1IteratorDesc, row2IteratorDesc, row3IteratorDesc ), - descendingComparator, 900 ); + descendingComparator, 900 ); - for ( long i = maxValue - 1; i > -1; i-- ) { - assertEquals( i, descendingItr.next().longValue() ); + for ( long i = maxValue - 1; i > -1; i-- ) { + assertEquals( i, descendingItr.next().longValue() ); + } } - } @Test @@ -233,39 +229,28 @@ public class MultiKeyColumnNameIteratorTest { /** * Write to both rows in parallel */ - Observable.just( rowKey1 ) - .parallel( new Func1<Observable<String>, Observable<String>>() { - @Override - public Observable<String> call( final Observable<String> stringObservable ) { - return stringObservable.doOnNext( new Action1<String>() { - @Override - public void call( final String key ) { - - final MutationBatch batch = keyspace.prepareMutationBatch(); - - for ( long i = 0; i < maxValue; i++ ) { - batch.withRow( COLUMN_FAMILY, key ).putColumn( i, TRUE ); - - if ( i % 1000 == 0 ) { - try { - batch.execute(); - } - catch ( ConnectionException e ) { - throw new RuntimeException( e ); - } - } - } - - try { - batch.execute(); - } - catch ( ConnectionException e ) { - throw new RuntimeException( e ); - } - } - } ); - } - } ).toBlocking().last(); + Observable.just( rowKey1 ).flatMap( rowKey -> Observable.just( rowKey ).doOnNext( key -> { + final MutationBatch batch = keyspace.prepareMutationBatch(); + + for ( long i = 0; i < maxValue; i++ ) { + batch.withRow( COLUMN_FAMILY, key ).putColumn( i, TRUE ); + + if ( i % 1000 == 0 ) { + try { + batch.execute(); + } + catch ( ConnectionException e ) { + throw new RuntimeException( e ); + } + } + } + + try { + batch.execute(); + } + catch ( ConnectionException e ) { + throw new RuntimeException( e ); + }} ).subscribeOn( Schedulers.io() ) ).toBlocking().last(); //create 3 iterators http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java index c32b820..d88ebe5 100644 --- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java +++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java @@ -54,6 +54,7 @@ import rx.Observable; import rx.Observer; import rx.functions.Action1; import rx.functions.Func1; +import rx.schedulers.Schedulers; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -373,38 +374,29 @@ public class MultiRowColumnIteratorTest { /** * Write to both rows in parallel */ - Observable.just( rowKey1 ).parallel( new Func1<Observable<String>, Observable<String>>() { - @Override - public Observable<String> call( final Observable<String> stringObservable ) { - return stringObservable.doOnNext( new Action1<String>() { - @Override - public void call( final String key ) { - - final MutationBatch batch = keyspace.prepareMutationBatch(); - - for ( long i = 0; i < maxValue; i++ ) { - batch.withRow( COLUMN_FAMILY, key ).putColumn( i, TRUE ); - - if ( i % 1000 == 0 ) { - try { - batch.execute(); - } - catch ( ConnectionException e ) { - throw new RuntimeException( e ); - } - } - } + Observable.just( rowKey1 ).flatMap( rowKey -> Observable.just( rowKey ).doOnNext( key -> { + final MutationBatch batch = keyspace.prepareMutationBatch(); - try { - batch.execute(); - } - catch ( ConnectionException e ) { - throw new RuntimeException( e ); - } + for ( long i = 0; i < maxValue; i++ ) { + batch.withRow( COLUMN_FAMILY, key ).putColumn( i, TRUE ); + + if ( i % 1000 == 0 ) { + try { + batch.execute(); + } + catch ( ConnectionException e ) { + throw new RuntimeException( e ); } - } ); + } + } + + try { + batch.execute(); + } + catch ( ConnectionException e ) { + throw new RuntimeException( e ); } - } ).toBlocking().last(); + } ).subscribeOn( Schedulers.io() ) ).toBlocking().last(); //create 3 iterators
