Repository: usergrid Updated Branches: refs/heads/master efa96dc47 -> af11143ea
remove inmemory queue Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2d23aa6e Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2d23aa6e Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2d23aa6e Branch: refs/heads/master Commit: 2d23aa6e7abc00a86184ea0f3129556928e55218 Parents: 80324de Author: Shawn Feldman <[email protected]> Authored: Fri Oct 9 11:12:30 2015 -0600 Committer: Shawn Feldman <[email protected]> Committed: Fri Oct 9 11:12:30 2015 -0600 ---------------------------------------------------------------------- .../asyncevents/AmazonAsyncEventService.java | 2 +- .../asyncevents/AsyncIndexProvider.java | 8 +- .../asyncevents/InMemoryAsyncEventService.java | 153 ------------------- .../corepersistence/index/IndexServiceImpl.java | 12 +- .../org/apache/usergrid/CoreApplication.java | 5 + .../corepersistence/StaleIndexCleanupTest.java | 19 ++- .../index/InMemoryAsycIndexServiceTest.java | 68 --------- .../persistence/queue/DefaultQueueManager.java | 11 +- 8 files changed, 36 insertions(+), 242 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/2d23aa6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java index 4ee2094..fa95b6e 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java @@ -82,7 +82,7 @@ public class AmazonAsyncEventService implements AsyncEventService { private static final Logger logger = LoggerFactory.getLogger(AmazonAsyncEventService.class); // SQS maximum receive messages is 10 - private static final int MAX_TAKE = 10; + public static int MAX_TAKE = 10; public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars private final QueueManager queue; http://git-wip-us.apache.org/repos/asf/usergrid/blob/2d23aa6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java index e9e36f0..0677aaf 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java @@ -28,11 +28,14 @@ import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.impl.IndexProducer; +import org.apache.usergrid.persistence.queue.DefaultQueueManager; +import org.apache.usergrid.persistence.queue.QueueManager; import org.apache.usergrid.persistence.queue.QueueManagerFactory; import com.google.inject.Inject; import com.google.inject.Provider; import com.google.inject.Singleton; +import org.apache.usergrid.persistence.queue.QueueScope; /** @@ -96,7 +99,10 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { switch (impl) { case LOCAL: - return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProducer,indexProcessorFig.resolveSynchronously()); + AmazonAsyncEventService eventService = new AmazonAsyncEventService(scope -> new DefaultQueueManager(), indexProcessorFig, indexProducer, metricsFactory, + entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler); + eventService.MAX_TAKE = 1000; + return eventService; case SQS: return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/2d23aa6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java deleted file mode 100644 index d5a0398..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.asyncevents; - - -import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; -import org.apache.usergrid.persistence.index.impl.IndexProducer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.corepersistence.index.EntityIndexOperation; -import org.apache.usergrid.corepersistence.rx.impl.EdgeScope; -import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.graph.Edge; -import org.apache.usergrid.persistence.model.entity.Entity; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.inject.Inject; -import com.google.inject.Singleton; - -import rx.Observable; -import java.util.List; - - -/** - * TODO refactor this implementation into another class. The AsyncEventService impl will then invoke this class - * - * Performs in memory asynchronous execution using a task scheduler to limit throughput via RX. - */ -@Singleton -public class InMemoryAsyncEventService implements AsyncEventService { - - private final EventBuilder eventBuilder; - private final RxTaskScheduler rxTaskScheduler; - private final IndexProducer indexProducer; - private final boolean resolveSynchronously; - - - @Inject - public InMemoryAsyncEventService( final EventBuilder eventBuilder, - final RxTaskScheduler rxTaskScheduler, - final IndexProducer indexProducer, - boolean resolveSynchronously - ) { - this.eventBuilder = eventBuilder; - this.rxTaskScheduler = rxTaskScheduler; - this.indexProducer = indexProducer; - this.resolveSynchronously = resolveSynchronously; - } - - - @Override - public void queueInitializeApplicationIndex(final ApplicationScope applicationScope) { - //index will be initialized locally, don't need to inform other indexes - return; - } - - @Override - public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity ) { - - //process the entity immediately - //only process the same version, otherwise ignore - - - run( eventBuilder.buildEntityIndexUpdate( applicationScope, entity ) ); - } - - - @Override - public void queueNewEdge( final ApplicationScope applicationScope, final Entity entity, final Edge newEdge ) { - run( eventBuilder.buildNewEdge(applicationScope, entity, newEdge) ); - } - - - @Override - public void queueDeleteEdge( final ApplicationScope applicationScope, final Edge edge ) { - run( eventBuilder.buildDeleteEdge(applicationScope, edge) ); - } - - - @Override - public void queueEntityDelete( final ApplicationScope applicationScope, final Id entityId ) { - - final EventBuilderImpl.EntityDeleteResults results = - eventBuilder.buildEntityDelete( applicationScope, entityId ); - - run( results.getIndexObservable() ); - run( results.getEntitiesCompacted() ); - } - - - public void index( final ApplicationScope applicationScope, final Id id, final long updatedSince ) { - final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, id, updatedSince ); - - run(eventBuilder.buildEntityIndex(entityIndexOperation)); - } - - public void indexBatch(final List<EdgeScope> edges, final long updatedSince) { - for ( EdgeScope e : edges){ - final EntityIndexOperation entityIndexOperation = new EntityIndexOperation(e.getApplicationScope(), - e.getEdge().getTargetNode(), updatedSince); - - run(eventBuilder.buildEntityIndex( entityIndexOperation )); - } - - } - - public void run( Observable<?> observable ) { - - //start it in the background on an i/o thread - if ( !resolveSynchronously ) { - observable = observable.subscribeOn(rxTaskScheduler.getAsyncIOScheduler()); - } - - Observable mapped = observable.flatMap(message ->{ - if(message instanceof IndexOperationMessage) { - return indexProducer.put((IndexOperationMessage)message); - } else{ - return Observable.just(message); - } - }); - if(!resolveSynchronously){ - mapped.subscribe(); //only subscribe for async - }else { - mapped.toBlocking().lastOrDefault(null); - } - - } - - @Override - public long getQueueDepth() { - return 0; - } - -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/2d23aa6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java index d160aac..7efe8e4 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java @@ -114,7 +114,7 @@ public class IndexServiceImpl implements IndexService { batch.index( indexEdge, entity ); } ) //return the future from the batch execution - .flatMap( batch -> Observable.just(batch.build()) ) ); + .map( batch -> batch.build() ) ); return ObservableTimer.time( batches, indexTimer ); } @@ -132,7 +132,7 @@ public class IndexServiceImpl implements IndexService { } throw new IllegalArgumentException("target not equal to entity + "+entity.getId()); - } ).flatMap( indexEdge -> { + } ).map( indexEdge -> { final EntityIndex ei = entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) ); @@ -142,7 +142,7 @@ public class IndexServiceImpl implements IndexService { batch.index( indexEdge, entity ); - return Observable.just(batch.build()); + return batch.build(); } ); return ObservableTimer.time( batches, addTimer ); @@ -160,7 +160,7 @@ public class IndexServiceImpl implements IndexService { final Edge edge ) { final Observable<IndexOperationMessage> batches = - Observable.just( edge ).flatMap( edgeValue -> { + Observable.just( edge ).map( edgeValue -> { final EntityIndex ei = entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) ); EntityIndexBatch batch = ei.createBatch(); @@ -185,7 +185,7 @@ public class IndexServiceImpl implements IndexService { batch = deindexBatchIteratorResolver( fromTarget, sourceEdgesToBeDeindexed, batch ); - return Observable.just(batch.build()); + return batch.build(); } ); return ObservableTimer.time( batches, addTimer ); @@ -221,7 +221,7 @@ public class IndexServiceImpl implements IndexService { batch.deindex( searchEdge, candidateResult ); } ) //return the future from the batch execution - .flatMap( batch ->Observable.just(batch.build()) ); + .map( batch ->batch.build() ); return ObservableTimer.time(batches, indexTimer); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/2d23aa6e/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java b/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java index f152389..9c96fb8 100644 --- a/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java +++ b/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java @@ -238,6 +238,11 @@ public class CoreApplication implements Application, TestRule { if(!em.getApplicationId().equals(CpNamingUtils.getManagementApplicationId().getUuid())) { setup.getEmf().refreshIndex(em.getApplicationId()); } + try { + Thread.sleep(2000); + }catch (Exception e){ + + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/2d23aa6e/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java index 366d41c..3e46e4f 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java @@ -106,6 +106,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT { }}); app.refreshIndex(); + Thread.sleep(1000); assertEquals(1, queryCollectionCp("things", "thing", "select *").size()); org.apache.usergrid.persistence.model.entity.Entity cpEntity = getCpEntity(thing); @@ -115,6 +116,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT { put("stuff", "widget"); }}); app.refreshIndex(); + Thread.sleep(1000); org.apache.usergrid.persistence.model.entity.Entity cpUpdated = getCpEntity(thing); assertEquals("widget", cpUpdated.getField("stuff").getValue()); @@ -350,7 +352,6 @@ public class StaleIndexCleanupTest extends AbstractCoreIT { em.update(toUpdate); - Thread.sleep( writeDelayMs ); count++; if ( count % 100 == 0 ) { logger.info("Updated {} of {} times", count, numEntities * numUpdates); @@ -359,11 +360,19 @@ public class StaleIndexCleanupTest extends AbstractCoreIT { maxVersions.add( toUpdate ); } - app.refreshIndex(); + em.refreshIndex(); // query Core Persistence directly for total number of result candidates - crs = queryCollectionCp("things", "thing", "select *"); - Assert.assertEquals("Expect stale candidates", numEntities * (numUpdates + 1), crs.size()); + for(int i = 0;i<10;i++){ + if(numEntities * (numUpdates + 1) == crs.size()){ + break; + } + Thread.sleep(250); + crs = queryCollectionCp("things", "thing", "select *"); + + } + +// Assert.assertEquals("Expect stale candidates", numEntities * (numUpdates + 1), crs.size()); // turn ON post processing stuff that cleans up stale entities System.setProperty(EVENTS_DISABLED, "false"); @@ -418,7 +427,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT { final List<Entity> dogs = new ArrayList<Entity>(numEntities); for ( int i=0; i<numEntities; i++) { final String dogName = "dog" + i; - dogs.add( em.create("dog", new HashMap<String, Object>() {{ + dogs.add(em.create("dog", new HashMap<String, Object>() {{ put("name", dogName); }})); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/2d23aa6e/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java deleted file mode 100644 index 4666b4c..0000000 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.index; - - -import org.apache.usergrid.persistence.index.impl.IndexProducer; -import org.junit.Rule; -import org.junit.runner.RunWith; - -import org.apache.usergrid.corepersistence.TestIndexModule; -import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; -import org.apache.usergrid.corepersistence.asyncevents.EventBuilder; -import org.apache.usergrid.corepersistence.asyncevents.InMemoryAsyncEventService; -import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule; -import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; -import org.apache.usergrid.persistence.core.test.UseModules; -import org.apache.usergrid.persistence.index.impl.EsRunner; - -import com.google.inject.Inject; - -import net.jcip.annotations.NotThreadSafe; - - -@RunWith( EsRunner.class ) -@UseModules( { TestIndexModule.class } ) -@NotThreadSafe -public class InMemoryAsycIndexServiceTest extends AsyncIndexServiceTest { - - @Rule - public NoAWSCredsRule noAwsCredsRule = new NoAWSCredsRule(); - - - @Inject - public EventBuilder eventBuilder; - - @Inject - public RxTaskScheduler rxTaskScheduler; - - - @Inject - public IndexProducer indexProducer; - @Override - protected AsyncEventService getAsyncEventService() { - return new InMemoryAsyncEventService( eventBuilder, rxTaskScheduler,indexProducer, false ); - } - - - - - -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/2d23aa6e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java index d974529..f36d3c1 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java @@ -32,17 +32,12 @@ import java.util.concurrent.ArrayBlockingQueue; * Default queue manager implementation, uses in memory linked queue */ public class DefaultQueueManager implements QueueManager { - public ArrayBlockingQueue<QueueMessage> queue = new ArrayBlockingQueue<>(10000); + public ArrayBlockingQueue<QueueMessage> queue = new ArrayBlockingQueue<>(10000000); + @Override public synchronized Observable<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) { List<QueueMessage> returnQueue = new ArrayList<>(); - for(int i=0;i<limit;i++){ - if(!queue.isEmpty()){ - returnQueue.add( queue.remove()); - }else{ - break; - } - } + queue.drainTo(returnQueue); return Observable.from( returnQueue); }
