Repository: incubator-usergrid Updated Branches: refs/heads/USERGRID-486 72ec19d56 -> dcf469378
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java index 26d06ad..ef258f4 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java @@ -188,7 +188,7 @@ public class GraphManagerImpl implements GraphManager { final Timer.Context timer = writeEdgeTimer.time(); final Meter meter = writeEdgeMeter; - return Observable.from( markedEdge ).map( new Func1<MarkedEdge, Edge>() { + return Observable.just( markedEdge ).map( new Func1<MarkedEdge, Edge>() { @Override public Edge call( final MarkedEdge edge ) { @@ -234,7 +234,7 @@ public class GraphManagerImpl implements GraphManager { final Timer.Context timer = deleteEdgeTimer.time(); final Meter meter = deleteEdgeMeter; - return Observable.from(markedEdge).map(new Func1<MarkedEdge, Edge>() { + return Observable.just(markedEdge).map(new Func1<MarkedEdge, Edge>() { @Override public Edge call(final MarkedEdge edge) { @@ -281,7 +281,7 @@ public class GraphManagerImpl implements GraphManager { public Observable<Id> deleteNode( final Id node, final long timestamp ) { final Timer.Context timer = deleteNodeTimer.time(); final Meter meter = deleteNodeMeter; - return Observable.from( node ).map( new Func1<Id, Id>() { + return Observable.just( node ).map( new Func1<Id, Id>() { @Override public Id call( final Id id ) { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java index ab141f7..bfaeaaa 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java @@ -176,6 +176,8 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair { * Sum up the total number of edges we had, then execute the mutation if we have * anything to do */ + + return MathObservable.sumInteger( Observable.merge( checks ) ) .doOnNext( new Action1<Integer>() { @Override http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java index e8c224e..6236a16 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java @@ -103,7 +103,7 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener { public Observable<Integer> receive( final ApplicationScope scope, final Id node, final UUID timestamp ) { - return Observable.from( node ) + return Observable.just( node ) //delete source and targets in parallel and merge them into a single observable .flatMap( new Func1<Id, Observable<Integer>>() { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java index 2d9b47f..ecb9a9b 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java @@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory; import rx.Observable; import rx.functions.Action1; import rx.functions.Func1; +import rx.schedulers.Schedulers; import java.util.List; import java.util.concurrent.atomic.AtomicLong; @@ -75,61 +76,49 @@ public class EdgeDataMigrationImpl implements DataMigration<GraphNode> { } - - @Override - public int migrate( final int currentVersion, final MigrationDataProvider<GraphNode> migrationDataProvider, - final ProgressObserver observer ) { + public int migrate( final int currentVersion, final MigrationDataProvider<GraphNode> migrationDataProvider, + final ProgressObserver observer ) { final AtomicLong counter = new AtomicLong(); - final MigrationRelationship<EdgeMetadataSerialization> - migration = allVersions.getMigrationRelationship( currentVersion ); - - final Observable<List<Edge>> observable = migrationDataProvider.getData().flatMap( new Func1<GraphNode, - Observable<List<Edge>>>() { - @Override - public Observable<List<Edge>> call( final GraphNode graphNode ) { - final GraphManager gm = graphManagerFactory.createEdgeManager( graphNode.applicationScope ); - - //get edges from the source - return edgesFromSourceObservable.edgesFromSource( gm, graphNode.entryNode ).buffer( 1000 ).parallel( new Func1<Observable<List<Edge>>, Observable<List<Edge>>>() { - @Override - public Observable<List<Edge>> call( final Observable<List<Edge>> listObservable ) { - return listObservable.doOnNext( new Action1<List<Edge>>() { - @Override - public void call( List<Edge> edges ) { - final MutationBatch batch = keyspace.prepareMutationBatch(); - - for ( Edge edge : edges ) { - logger.info( "Migrating meta for edge {}", edge ); - final MutationBatch edgeBatch = - migration.to.writeEdge( graphNode.applicationScope, edge ); - batch.mergeShallow( edgeBatch ); - } - - try { - batch.execute(); - } - catch ( ConnectionException e ) { - throw new RuntimeException( "Unable to perform migration", e ); - } - - //update the observer so the admin can see it - final long newCount = counter.addAndGet( edges.size() ); - - observer.update( migration.to.getImplementationVersion(), - String.format( "Currently running. Rewritten %d edge types", - newCount ) ); - } - } ); - } } ); - }} ); - - observable.longCount().toBlocking().last(); + final MigrationRelationship<EdgeMetadataSerialization> migration = + allVersions.getMigrationRelationship( currentVersion ); - return migration.to.getImplementationVersion(); + final Observable<List<Edge>> observable = migrationDataProvider.getData().flatMap( graphNode -> { + final GraphManager gm = graphManagerFactory.createEdgeManager( graphNode.applicationScope ); + + //get edges from the source + return edgesFromSourceObservable.edgesFromSource( gm, graphNode.entryNode ).buffer( 1000 ) + .doOnNext( edges -> { + final MutationBatch batch = keyspace.prepareMutationBatch(); + + for ( Edge edge : edges ) { + logger.info( "Migrating meta for edge {}", edge ); + final MutationBatch edgeBatch = + migration.to.writeEdge( graphNode.applicationScope, edge ); + batch.mergeShallow( edgeBatch ); + } + try { + batch.execute(); + } + catch ( ConnectionException e ) { + throw new RuntimeException( "Unable to perform migration", e ); + } + + //update the observer so the admin can see it + final long newCount = counter.addAndGet( edges.size() ); + + observer.update( migration.to.getImplementationVersion(), String + .format( "Currently running. Rewritten %d edge types", + newCount ) ); + } ).subscribeOn( Schedulers.io() ); + }, 10 ); + + observable.countLong().toBlocking().last(); + + return migration.to.getImplementationVersion(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java index 6d30d22..3bbf3e4 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java @@ -516,7 +516,7 @@ public class GraphManagerShardConsistencyIT { } } ) - .longCount().toBlocking().last(); + .countLong().toBlocking().last(); // if(returnedEdgeCount != count[0]-duplicate[0]){ http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/SimpleTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/SimpleTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/SimpleTest.java index 0a27a6b..7b3fafd 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/SimpleTest.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/SimpleTest.java @@ -63,23 +63,23 @@ public class SimpleTest { Edge testTargetEdge = createEdge( sourceId1, "test", targetId1, System.currentTimeMillis() ); - gm.writeEdge( testTargetEdge ).toBlockingObservable().singleOrDefault( null ); + gm.writeEdge( testTargetEdge ).toBlocking().singleOrDefault( null ); Edge testTarget2Edge = createEdge( sourceId2, "edgeType1", targetId1, System.currentTimeMillis() ); - gm.writeEdge( testTarget2Edge ).toBlockingObservable().singleOrDefault( null ); + gm.writeEdge( testTarget2Edge ).toBlocking().singleOrDefault( null ); Edge test2TargetEdge = createEdge( sourceId1, "edgeType1", targetId1, System.currentTimeMillis() ); - gm.writeEdge( test2TargetEdge ).toBlockingObservable().singleOrDefault( null ); + gm.writeEdge( test2TargetEdge ).toBlocking().singleOrDefault( null ); Edge test3TargetEdge = createEdge( sourceId1, "edgeType2", targetId1, System.currentTimeMillis() ); - gm.writeEdge( test3TargetEdge ).toBlockingObservable().singleOrDefault( null ); + gm.writeEdge( test3TargetEdge ).toBlocking().singleOrDefault( null ); int count = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType(targetId1, null, null) ) - .count().toBlockingObservable().last(); + .count().toBlocking().last(); assertEquals( 3, count ); count = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType(targetId1, "edgeType", null) ) - .count().toBlockingObservable().last(); + .count().toBlocking().last(); assertEquals( 2, count ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImplTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImplTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImplTest.java index a269c15..049c3d2 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImplTest.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImplTest.java @@ -121,7 +121,7 @@ public class EdgeDataMigrationImplTest implements DataMigrationResetRule.DataMig //walk from s1 and s2 - final Observable<GraphNode> graphNodes = Observable.from( new GraphNode( applicationScope, sourceId1), new GraphNode(applicationScope, sourceId2 ) ); + final Observable<GraphNode> graphNodes = Observable.just( new GraphNode( applicationScope, sourceId1), new GraphNode(applicationScope, sourceId2 ) ); final MigrationDataProvider<GraphNode> testMigrationProvider = new MigrationDataProvider<GraphNode>() { @Override http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/pom.xml ---------------------------------------------------------------------- diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml index 9656e2d..3ec7852 100644 --- a/stack/corepersistence/pom.xml +++ b/stack/corepersistence/pom.xml @@ -47,8 +47,8 @@ limitations under the License. <properties> - <maven.compiler.source>1.7</maven.compiler.source> - <maven.compiler.target>1.7</maven.compiler.target> + <maven.compiler.source>1.8</maven.compiler.source> + <maven.compiler.target>1.8</maven.compiler.target> <antlr.version>3.4</antlr.version> <archaius.version>0.5.12</archaius.version> @@ -64,14 +64,14 @@ limitations under the License. <guava.version>18.0</guava.version> <guice.version>4.0-beta5</guice.version> <guicyfig.version>3.2</guicyfig.version> - <hystrix.version>1.3.16</hystrix.version> + <hystrix.version>1.4.0</hystrix.version> <jackson-2-version>2.4.1</jackson-2-version> <jackson-smile.verson>2.4.3</jackson-smile.verson> <mockito.version>1.10.8</mockito.version> <junit.version>4.11</junit.version> <kryo-serializers.version>0.26</kryo-serializers.version> <log4j.version>1.2.17</log4j.version> - <rx.version>0.19.6</rx.version> + <rx.version>1.0.8</rx.version> <slf4j.version>1.7.2</slf4j.version> <surefire.version>2.16</surefire.version> <aws.version>1.9.0</aws.version> http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java index c962d6b..82af950 100644 --- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java +++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.UUID; import org.junit.ClassRule; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; @@ -32,7 +33,6 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; import org.apache.usergrid.persistence.core.test.UseModules; import org.apache.usergrid.persistence.index.EntityIndex; -import org.apache.usergrid.persistence.index.EntityIndexBatch; import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.IndexScope; import org.apache.usergrid.persistence.index.guice.IndexTestFig; @@ -41,13 +41,11 @@ import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.SimpleId; import org.apache.usergrid.persistence.model.field.IntegerField; -import org.apache.usergrid.persistence.model.util.UUIDGenerator; import com.google.inject.Inject; import rx.Observable; import rx.functions.Action1; -import rx.functions.Action2; import rx.functions.Func1; import rx.schedulers.Schedulers; @@ -57,6 +55,7 @@ import rx.schedulers.Schedulers; */ @RunWith( EsRunner.class ) @UseModules( { TestIndexModule.class } ) +@Ignore( "Should only be run during load tests of elasticsearch" ) public class IndexLoadTestsIT extends BaseIT { private static final Logger log = LoggerFactory.getLogger( IndexLoadTestsIT.class ); @@ -70,13 +69,14 @@ public class IndexLoadTestsIT extends BaseIT { @Inject public EntityIndexFactory entityIndexFactory; + @Test - public void testHeavyLoad(){ + public void testHeavyLoad() { final UUID applicationUUID = UUID.fromString( indexTestFig.getApplicationId() ); - final Id applicationId = new SimpleId(applicationUUID, "application"); - final ApplicationScope scope = new ApplicationScopeImpl( applicationId ); + final Id applicationId = new SimpleId( applicationUUID, "application" ); + final ApplicationScope scope = new ApplicationScopeImpl( applicationId ); final EntityIndex index = entityIndexFactory.createEntityIndex( scope ); @@ -87,83 +87,52 @@ public class IndexLoadTestsIT extends BaseIT { //run them all createEntities.toBlocking().last(); - - - - } - public Observable<Entity> createStreamFromWorkers(final EntityIndex entityIndex, final Id ownerId){ - - //create a sequence of observables. Each index will be it's own worker thread using the Schedulers.newthread() - return Observable.range( 0, indexTestFig.getNumberOfWorkers() ).parallel( new Func1<Observable<Integer>, Observable<Entity>>() { - - @Override - public Observable<Entity> call( final Observable<Integer> integerObservable ) { - return integerObservable.flatMap( new Func1<Integer, Observable<Entity>>() { - @Override - public Observable<Entity> call( final Integer integer ) { - return createWriteObservable( entityIndex, ownerId, integer ); - } - } ); + public Observable<Entity> createStreamFromWorkers( final EntityIndex entityIndex, final Id ownerId ) { - } - }, Schedulers.newThread() ); + //create a sequence of observables. Each index will be it's own worker thread using the Schedulers.newthread() + return Observable.range( 0, indexTestFig.getNumberOfWorkers() ).flatMap( + integer -> createWriteObservable( entityIndex, ownerId, integer ).subscribeOn( Schedulers.newThread() ) ); } - private Observable<Entity> createWriteObservable( final EntityIndex entityIndex, final Id ownerId, final int workerIndex){ + private Observable<Entity> createWriteObservable( final EntityIndex entityIndex, final Id ownerId, + final int workerIndex ) { final IndexScope scope = new IndexScopeImpl( ownerId, "test" ); - - return Observable.range( 0, indexTestFig.getNumberOfRecords() ) + return Observable.range( 0, indexTestFig.getNumberOfRecords() ) //create our entity - .map( new Func1<Integer, Entity>() { - @Override - public Entity call( final Integer integer ) { - final Entity entity = new Entity("test"); - - entity.setField( new IntegerField("workerIndex", workerIndex)); - entity.setField( new IntegerField( "ordinal", integer ) ); - - return entity; - } - } ).buffer( indexTestFig.getBufferSize() ).doOnNext( new Action1<List<Entity>>() { - @Override - public void call( final List<Entity> entities ) { - //take our entities and roll them into a batch - Observable.from( entities ).collect( entityIndex.createBatch(), new Action2<EntityIndexBatch, Entity>() { - - - @Override - public void call( final EntityIndexBatch entityIndexBatch, final Entity entity ) { - entityIndexBatch.index(scope, entity ); - } - } ).doOnNext( new Action1<EntityIndexBatch>() { - @Override - public void call( final EntityIndexBatch entityIndexBatch ) { + .map( new Func1<Integer, Entity>() { + @Override + public Entity call( final Integer integer ) { + final Entity entity = new Entity( "test" ); + + entity.setField( new IntegerField( "workerIndex", workerIndex ) ); + entity.setField( new IntegerField( "ordinal", integer ) ); + + return entity; + } + } ).buffer( indexTestFig.getBufferSize() ).doOnNext( new Action1<List<Entity>>() { + @Override + public void call( final List<Entity> entities ) { + //take our entities and roll them into a batch + Observable.from( entities ) + .collect( () -> entityIndex.createBatch(), ( entityIndexBatch, entity ) -> { + + entityIndexBatch.index( scope, entity ); + } ).doOnNext( entityIndexBatch -> { entityIndexBatch.execute(); - } - } ).toBlocking().last(); - } - } ) - - //translate back into a stream of entities for the caller to use - .flatMap( new Func1<List<Entity>, Observable<Entity>>() { - @Override - public Observable<Entity> call( final List<Entity> entities ) { - return Observable.from( entities ); - } - } ); + } ).toBlocking().last(); + } + } ) + //translate back into a stream of entities for the caller to use + .flatMap(entities -> Observable.from( entities ) ); } - - - - } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/pom.xml ---------------------------------------------------------------------- diff --git a/stack/pom.xml b/stack/pom.xml index f24917a..efbda2d 100644 --- a/stack/pom.xml +++ b/stack/pom.xml @@ -125,7 +125,7 @@ <usergrid.it.threads>8</usergrid.it.threads> <metrics.version>3.0.0</metrics.version> - <rx.version>0.19.6</rx.version> + <rx.version>1.0.8</rx.version> <surefire.plugin.artifactName>surefire-junit47</surefire.plugin.artifactName> <surefire.plugin.version>2.18.1</surefire.plugin.version> <powermock.version>1.6.1</powermock.version> @@ -1560,8 +1560,8 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> - <source>1.7</source> - <target>1.7</target> + <source>1.8</source> + <target>1.8</target> <optimize>true</optimize> <showDeprecation>true</showDeprecation> <debug>true</debug> @@ -1583,7 +1583,7 @@ <configuration> <rules> <requireJavaVersion> - <version>1.7.0</version> + <version>1.8.0</version> </requireJavaVersion> <requireMavenVersion> <version>[3.0,)</version> http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java index 4f849e0..bebd557 100644 --- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java +++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java @@ -937,18 +937,12 @@ public class ImportServiceImpl implements ImportService { // potentially skip the first n if this is a resume operation final int entityNumSkip = (int)tracker.getTotalEntityCount(); - // with this code we get asynchronous behavior and testImportWithMultipleFiles will fail - final int entityCount = entityEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() { - @Override - public Boolean call( final WriteEvent writeEvent ) { - return !tracker.shouldStopProcessingEntities(); - } - } ).skip(entityNumSkip).parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() { - @Override - public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) { - return entityWrapperObservable.doOnNext(doWork); - } - }, Schedulers.io()).reduce(0, heartbeatReducer).toBlocking().last(); + + entityEventObservable.takeWhile( writeEvent -> !tracker.shouldStopProcessingEntities() ).skip( entityNumSkip ) + .flatMap( writeEvent -> { + return Observable.just( writeEvent ).doOnNext( doWork ); + }, 10 ).reduce( 0, heartbeatReducer ).toBlocking().last(); + jp.close(); @@ -979,17 +973,11 @@ public class ImportServiceImpl implements ImportService { final int connectionNumSkip = (int)tracker.getTotalConnectionCount(); // with this code we get asynchronous behavior and testImportWithMultipleFiles will fail - final int connectionCount = otherEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() { - @Override - public Boolean call( final WriteEvent writeEvent ) { - return !tracker.shouldStopProcessingConnections(); - } - } ).skip(connectionNumSkip).parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() { - @Override - public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) { - return entityWrapperObservable.doOnNext(doWork); - } - }, Schedulers.io()).reduce(0, heartbeatReducer).toBlocking().last(); + final int connectionCount = otherEventObservable.takeWhile( + writeEvent -> !tracker.shouldStopProcessingConnections() ).skip(connectionNumSkip).flatMap( entityWrapper ->{ + return Observable.just(entityWrapper).doOnNext( doWork ).subscribeOn( Schedulers.io() ); + + }, 10 ).reduce(0, heartbeatReducer).toBlocking().last(); jp.close(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java index 5b1a6b3..b183daa 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java @@ -110,84 +110,81 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { final UUID appId = em.getApplication().getUuid(); final Map<String,Object> payloads = notification.getPayloads(); - final Func1<Entity,Entity> entityListFunct = new Func1<Entity, Entity>() { - @Override - public Entity call(Entity entity) { + final Func1<Entity,Entity> entityListFunct = entity -> { - try { + try { - long now = System.currentTimeMillis(); - List<EntityRef> devicesRef = getDevices(entity); // resolve group + long now = System.currentTimeMillis(); + List<EntityRef> devicesRef = getDevices(entity); // resolve group - LOG.info("notification {} queue {} devices, duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), devicesRef.size()); + LOG.info("notification {} queue {} devices, duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), devicesRef.size()); - for (EntityRef deviceRef : devicesRef) { - LOG.info("notification {} starting to queue device {} ", notification.getUuid(), deviceRef.getUuid()); - long hash = MurmurHash.hash(deviceRef.getUuid()); - if (sketch.estimateCount(hash) > 0) { //look for duplicates - LOG.warn("Maybe Found duplicate device: {}", deviceRef.getUuid()); - continue; - } else { - sketch.add(hash, 1); - } - String notifierId = null; - String notifierKey = null; - - //find the device notifier info, match it to the payload - for (Map.Entry<String, Object> entry : payloads.entrySet()) { - ProviderAdapter adapter = notifierMap.get(entry.getKey().toLowerCase()); - now = System.currentTimeMillis(); - String providerId = getProviderId(deviceRef, adapter.getNotifier()); - if (providerId != null) { - notifierId = providerId; - notifierKey = entry.getKey().toLowerCase(); - break; - } - LOG.info("Provider query for notification {} device {} took "+(System.currentTimeMillis()-now)+" ms",notification.getUuid(),deviceRef.getUuid()); - } + for (EntityRef deviceRef : devicesRef) { + LOG.info("notification {} starting to queue device {} ", notification.getUuid(), deviceRef.getUuid()); + long hash = MurmurHash.hash(deviceRef.getUuid()); + if (sketch.estimateCount(hash) > 0) { //look for duplicates + LOG.warn("Maybe Found duplicate device: {}", deviceRef.getUuid()); + continue; + } else { + sketch.add(hash, 1); + } + String notifierId = null; + String notifierKey = null; - if (notifierId == null) { - LOG.info("Notifier did not match for device {} ", deviceRef); - continue; + //find the device notifier info, match it to the payload + for (Map.Entry<String, Object> entry : payloads.entrySet()) { + ProviderAdapter adapter = notifierMap.get(entry.getKey().toLowerCase()); + now = System.currentTimeMillis(); + String providerId = getProviderId(deviceRef, adapter.getNotifier()); + if (providerId != null) { + notifierId = providerId; + notifierKey = entry.getKey().toLowerCase(); + break; } + LOG.info("Provider query for notification {} device {} took "+(System.currentTimeMillis()-now)+" ms",notification.getUuid(),deviceRef.getUuid()); + } - ApplicationQueueMessage message = new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), notifierKey, notifierId); - if (notification.getQueued() == null) { - // update queued time - now = System.currentTimeMillis(); - notification.setQueued(System.currentTimeMillis()); - LOG.info("notification {} device {} queue time set. duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), deviceRef.getUuid()); - } + if (notifierId == null) { + LOG.info("Notifier did not match for device {} ", deviceRef); + continue; + } + + ApplicationQueueMessage message = new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), notifierKey, notifierId); + if (notification.getQueued() == null) { + // update queued time now = System.currentTimeMillis(); - qm.sendMessage(message); - LOG.info("notification {} post-queue to device {} duration " + (System.currentTimeMillis() - now) + " ms "+queueName+" queue", notification.getUuid(), deviceRef.getUuid()); - deviceCount.incrementAndGet(); - queueMeter.mark(); + notification.setQueued(System.currentTimeMillis()); + LOG.info("notification {} device {} queue time set. duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), deviceRef.getUuid()); } - } catch (Exception deviceLoopException) { - LOG.error("Failed to add devices", deviceLoopException); - errorMessages.add("Failed to add devices for entity: " + entity.getUuid() + " error:" + deviceLoopException); + now = System.currentTimeMillis(); + qm.sendMessage(message); + LOG.info("notification {} post-queue to device {} duration " + (System.currentTimeMillis() - now) + " ms "+queueName+" queue", notification.getUuid(), deviceRef.getUuid()); + deviceCount.incrementAndGet(); + queueMeter.mark(); } - return entity; + } catch (Exception deviceLoopException) { + LOG.error("Failed to add devices", deviceLoopException); + errorMessages.add("Failed to add devices for entity: " + entity.getUuid() + " error:" + deviceLoopException); } + return entity; }; long now = System.currentTimeMillis(); - Observable o = rx.Observable.create(new IteratorObservable<Entity>(iterator)) - .parallel(new Func1<Observable<Entity>, Observable<Entity>>() { - @Override - public rx.Observable<Entity> call(rx.Observable<Entity> deviceObservable) { - return deviceObservable.map(entityListFunct); - } - }, Schedulers.io()) - .doOnError(new Action1<Throwable>() { - @Override - public void call(Throwable throwable) { - LOG.error("Failed while writing", throwable); - } - }); - o.toBlocking().lastOrDefault(null); - LOG.info("notification {} done queueing duration {} ms", notification.getUuid(), System.currentTimeMillis() - now); + + + //process up to 10 concurrently + Observable o = rx.Observable.create( new IteratorObservable<Entity>( iterator ) ) + + .flatMap( entity -> Observable.just( entity ).map( entityListFunct ) + .doOnError( throwable -> { + LOG.error( "Failed while writing", + throwable ); + } ).subscribeOn( Schedulers.io() ) + + , 10 ); + + o.toBlocking().lastOrDefault( null ); + LOG.info( "notification {} done queueing duration {} ms", notification.getUuid(), System.currentTimeMillis() - now); } // update queued time @@ -338,48 +335,39 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { return message; } }; - Observable o = rx.Observable.from(messages) - .parallel(new Func1<rx.Observable<QueueMessage>, rx.Observable<ApplicationQueueMessage>>() { - @Override - public rx.Observable<ApplicationQueueMessage> call(rx.Observable<QueueMessage> messageObservable) { - return messageObservable.map(func); + + //from each queue message, process them in parallel up to 10 at a time + Observable o = rx.Observable.from( messages ).flatMap( queueMessage -> { + + + return Observable.just( queueMessage ).map( func ).buffer( messages.size() ).map( queueMessages -> { + //for gcm this will actually send notification + for ( ProviderAdapter providerAdapter : notifierMap.values() ) { + try { + providerAdapter.doneSendingNotifications(); } - }, Schedulers.io()) - .buffer(messages.size()) - .map(new Func1<List<ApplicationQueueMessage>, HashMap<UUID, ApplicationQueueMessage>>() { - @Override - public HashMap<UUID, ApplicationQueueMessage> call(List<ApplicationQueueMessage> queueMessages) { - //for gcm this will actually send notification - for (ProviderAdapter providerAdapter : notifierMap.values()) { - try { - providerAdapter.doneSendingNotifications(); - } catch (Exception e) { - LOG.error("providerAdapter.doneSendingNotifications: ", e); - } + catch ( Exception e ) { + LOG.error( "providerAdapter.doneSendingNotifications: ", e ); + } + } + //TODO: check if a notification is done and mark it + HashMap<UUID, ApplicationQueueMessage> notifications = new HashMap<>(); + for ( ApplicationQueueMessage message : queueMessages ) { + if ( notifications.get( message.getNotificationId() ) == null ) { + try { + TaskManager taskManager = taskMap.get( message.getNotificationId() ); + notifications.put( message.getNotificationId(), message ); + taskManager.finishedBatch(); } - //TODO: check if a notification is done and mark it - HashMap<UUID, ApplicationQueueMessage> notifications = new HashMap<UUID, ApplicationQueueMessage>(); - for (ApplicationQueueMessage message : queueMessages) { - if (notifications.get(message.getNotificationId()) == null) { - try { - TaskManager taskManager = taskMap.get(message.getNotificationId()); - notifications.put(message.getNotificationId(), message); - taskManager.finishedBatch(); - } catch (Exception e) { - LOG.error("Failed to finish batch", e); - } - } - + catch ( Exception e ) { + LOG.error( "Failed to finish batch", e ); } - return notifications; - } - }) - .doOnError(new Action1<Throwable>() { - @Override - public void call(Throwable throwable) { - LOG.error("Failed while sending",throwable); } - }); + } + return notifications; + } ).doOnError( throwable -> LOG.error( "Failed while sending", throwable ) ); + }, 10 ); + return o; } @@ -400,7 +388,8 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { * {"winphone":"mymessage","apple":"mymessage"} * TODO: document this method better */ - private Map<String, Object> translatePayloads(Map<String, Object> payloads, Map<Object, ProviderAdapter> notifierMap) throws Exception { + private Map<String, Object> translatePayloads(Map<String, Object> payloads, Map<Object, ProviderAdapter> + notifierMap) throws Exception { Map<String, Object> translatedPayloads = new HashMap<String, Object>( payloads.size()); for (Map.Entry<String, Object> entry : payloads.entrySet()) { String payloadKey = entry.getKey().toLowerCase(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/test-utils/src/main/java/org/apache/usergrid/setup/ConcurrentProcessSingleton.java ---------------------------------------------------------------------- diff --git a/stack/test-utils/src/main/java/org/apache/usergrid/setup/ConcurrentProcessSingleton.java b/stack/test-utils/src/main/java/org/apache/usergrid/setup/ConcurrentProcessSingleton.java index e8c5ace..6d0419a 100644 --- a/stack/test-utils/src/main/java/org/apache/usergrid/setup/ConcurrentProcessSingleton.java +++ b/stack/test-utils/src/main/java/org/apache/usergrid/setup/ConcurrentProcessSingleton.java @@ -20,6 +20,8 @@ package org.apache.usergrid.setup; +import java.io.IOException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,7 +108,19 @@ public class ConcurrentProcessSingleton { barrier.await( ONE_MINUTE ); logger.info( "Setup to complete" ); - lock.maybeReleaseLock(); + + Runtime.getRuntime().addShutdownHook( new Thread( ){ + @Override + public void run() { + try { + lock.maybeReleaseLock(); + } + catch ( IOException e ) { + throw new RuntimeException( "Unable to release lock" ); + } + } + }); + } catch ( Exception e ) { throw new RuntimeException( "Unable to initialize system", e );
