Fix re-index memory leak with flatmap observable and speed up re-index.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/ef8899a1 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/ef8899a1 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/ef8899a1 Branch: refs/heads/usergrid-1318-queue Commit: ef8899a100b8488d4dfd528ce94a1cb8bea582fe Parents: 33319f3 Author: Michael Russo <[email protected]> Authored: Fri Sep 30 18:14:37 2016 -0700 Committer: Michael Russo <[email protected]> Committed: Fri Sep 30 18:14:37 2016 -0700 ---------------------------------------------------------------------- .../asyncevents/AsyncEventServiceImpl.java | 20 ++++------- .../index/IndexProcessorFig.java | 2 +- .../index/ReIndexServiceImpl.java | 37 +++++++++----------- .../EntityCollectionManagerFactoryImpl.java | 2 ++ 4 files changed, 25 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/ef8899a1/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java index 0bff887..a108e40 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java @@ -876,23 +876,15 @@ public class AsyncEventServiceImpl implements AsyncEventService { public void indexBatch(final List<EdgeScope> edges, final long updatedSince) { - IndexOperationMessage batch = new IndexOperationMessage(); + final List<EntityIndexEvent> batch = new ArrayList<>(); + edges.forEach(e -> { - for ( EdgeScope e : edges){ + //change to id scope to avoid serialization issues + batch.add(new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince)); - EntityIndexOperation entityIndexOperation = - new EntityIndexOperation( e.getApplicationScope(), e.getEdge().getTargetNode(), updatedSince); - - IndexOperationMessage indexOperationMessage = - eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null); - - if (indexOperationMessage != null){ - batch.ingest(indexOperationMessage); - } - - } + }); - queueIndexOperationMessage(batch); + offerBatch( batch ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/ef8899a1/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java index c05c047..1038408 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java @@ -90,7 +90,7 @@ public interface IndexProcessorFig extends GuicyFig { @Key(ELASTICSEARCH_QUEUE_IMPL) String getQueueImplementation(); - @Default("100") + @Default("500") @Key(REINDEX_BUFFER_SIZE) int getReindexBufferSize(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/ef8899a1/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java index e3b179d..19fbcfa 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java @@ -24,6 +24,7 @@ import java.time.Instant; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.usergrid.persistence.index.EntityIndexFactory; @@ -113,6 +114,8 @@ public class ReIndexServiceImpl implements ReIndexService { //load our last emitted Scope if a cursor is present + final AtomicInteger count = new AtomicInteger(); + final Optional<EdgeScope> cursor = parseCursor( reIndexRequestBuilder.getCursor() ); @@ -161,29 +164,21 @@ public class ReIndexServiceImpl implements ReIndexService { } - Observable<List<EdgeScope>> runningReIndex = allEntityIdsObservable.getEdgesToEntities( applicationScopes, + allEntityIdsObservable.getEdgesToEntities( applicationScopes, reIndexRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() ) - .buffer( indexProcessorFig.getReindexBufferSize()) - .flatMap( edgeScopes -> Observable.just(edgeScopes) - .doOnNext(edges -> { - - logger.info("Sending batch of {} to be indexed.", edges.size()); - indexService.indexBatch(edges, modifiedSince); - }) - .subscribeOn( Schedulers.io() ), indexProcessorFig.getReindexConcurrencyFactor()); - - - // start our sampler and state persistence - // take a sample every sample interval to allow us to resume state with minimal loss - // create our flushing collector and flush the edge scopes to it - runningReIndex.collect(() -> new FlushingCollector(jobId), - ((flushingCollector, edgeScopes) -> flushingCollector.flushBuffer(edgeScopes))) - .doOnNext( flushingCollector-> flushingCollector.complete() ) - //subscribe on our I/O scheduler and run the task - .subscribeOn( Schedulers.io() ).subscribe(); //want reindex to continually run so leave subscribe. - - + .doOnNext( edgeScopes -> { + logger.info("Sending batch of {} to be indexed.", edgeScopes.size()); + indexService.indexBatch(edgeScopes, modifiedSince); + count.addAndGet(edgeScopes.size() ); + if( edgeScopes.size() > 0 ) { + writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1)); + } + writeStateMeta( jobId, Status.INPROGRESS, count.get(), System.currentTimeMillis() ); }) + .doOnCompleted(() -> writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() )) + .subscribeOn( Schedulers.io() ).subscribe(); + + return new ReIndexStatus( jobId, Status.STARTED, 0, 0 ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/ef8899a1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java index fcaa51d..aa962dd 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java @@ -44,6 +44,8 @@ import org.apache.usergrid.persistence.core.astyanax.CassandraConfig; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.concurrent.ExecutionException;
