Repository: incubator-usergrid Updated Branches: refs/heads/two-dot-o-dev 9b939d15b -> 1e3de5725
Updated index refresh to be more procedural. Still a race condition on index processing and flush. Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/1e3de572 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/1e3de572 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/1e3de572 Branch: refs/heads/two-dot-o-dev Commit: 1e3de5725ce2fff16e44a4b1b068269d197411eb Parents: 9b939d1 Author: Todd Nine <[email protected]> Authored: Mon May 4 15:42:22 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Mon May 4 15:42:22 2015 -0600 ---------------------------------------------------------------------- .../corepersistence/CpEntityManager.java | 2 - .../usergrid/persistence/CollectionIT.java | 2 + .../core/future/FutureObservable.java | 6 +- .../persistence/index/IndexRefreshCommand.java | 2 +- .../index/impl/EsEntityIndexImpl.java | 2 - .../index/impl/EsIndexBufferConsumerImpl.java | 13 +- .../index/impl/IndexBufferConsumer.java | 2 +- .../index/impl/IndexRefreshCommandImpl.java | 170 +++++++++---------- 8 files changed, 94 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1e3de572/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 6ffefe3..7d003cc 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -579,8 +579,6 @@ public class CpEntityManager implements EntityManager { // first, update entity index in its own collection scope updateEntityMeter.mark(); - Timer.Context timer = updateEntityTimer.time(); - Id entityId = new SimpleId( entity.getUuid(), entity.getType() ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1e3de572/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java index fcdc11f..8c94d32 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java @@ -363,6 +363,8 @@ public class CollectionIT extends AbstractCoreIT { app.refreshIndex(); +// Thread.sleep(500); + final Query query = Query.fromQL( "nickname = 'ed'" ); Results r = em.searchCollection( group, "users", query.withResultsLevel( Level.LINKED_PROPERTIES ) ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1e3de572/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java index f86ffc1..06eed4d 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java @@ -27,13 +27,11 @@ import java.util.concurrent.FutureTask; */ public class FutureObservable<T> { - private final T returnVal; private final FutureTask<T> future; public FutureObservable(final T returnVal) { - this.returnVal = returnVal; - future = new FutureTask<T>( () -> returnVal ); + future = new FutureTask<>( () -> returnVal ); } public void done() { @@ -41,6 +39,6 @@ public class FutureObservable<T> { } public Observable<T> observable() { - return !future.isDone() ? Observable.from(future) : Observable.just(returnVal); + return Observable.from(future); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1e3de572/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexRefreshCommand.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexRefreshCommand.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexRefreshCommand.java index 03be233..af7f814 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexRefreshCommand.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexRefreshCommand.java @@ -28,7 +28,7 @@ public interface IndexRefreshCommand { Observable<IndexRefreshCommandInfo> execute(String[] indexes); - public static class IndexRefreshCommandInfo{ + class IndexRefreshCommandInfo{ private final boolean hasFinished; private final long executionTime; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1e3de572/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java index 20a940f..904f58b 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java @@ -323,8 +323,6 @@ public class EsEntityIndexImpl implements AliasedEntityIndex,VersionedData { public Observable<IndexRefreshCommand.IndexRefreshCommandInfo> refreshAsync() { refreshIndexMeter.mark(); - Observable future = producer.put(new IndexOperationMessage()); - future.toBlocking().last(); return indexRefreshCommand.execute(getUniqueIndexes()); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1e3de572/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java index b2433bd..f1c493a 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java @@ -108,7 +108,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer { } - public Observable put( IndexOperationMessage message ) { + public Observable<IndexOperationMessage> put( IndexOperationMessage message ) { Preconditions.checkNotNull(message, "Message cannot be null"); indexSizeCounter.inc( message.getDeIndexRequests().size() ); indexSizeCounter.inc( message.getIndexRequests().size() ); @@ -129,7 +129,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer { //buffer on our new thread with a timeout observable.buffer( indexFig.getIndexBufferSize(), indexFig.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, - Schedulers.newThread() ).flatMap( indexOpBuffer -> { + Schedulers.io() ).flatMap( indexOpBuffer -> { //hand off to processor in new observable thread so we can continue to buffer faster return Observable.just( indexOpBuffer ).flatMap( @@ -199,13 +199,8 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer { //subscribe to the operations that generate requests on a new thread so that we can execute them quickly //mark this as done - return processedIndexOperations.doOnNext( processedIndexOp -> - { - processedIndexOp.done(); - } - ).doOnError(t -> { - log.error("Unable to ack futures", t); - }); + return processedIndexOperations.doOnNext( processedIndexOp -> processedIndexOp.done() + ).doOnError(t -> log.error("Unable to ack futures", t) ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1e3de572/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java index 6898f15..2906570 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java @@ -34,5 +34,5 @@ public interface IndexBufferConsumer { * @param message * @return */ - Observable put(IndexOperationMessage message); + Observable<IndexOperationMessage> put(IndexOperationMessage message); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1e3de572/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java index 71a05a0..34f0e6e 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java @@ -20,26 +20,21 @@ package org.apache.usergrid.persistence.index.impl; -import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.TimeUnit; -import com.amazonaws.services.elastictranscoder.model.TimeSpan; -import org.apache.usergrid.persistence.core.util.StringUtils; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.index.query.FilterBuilders; -import org.elasticsearch.indices.IndexMissingException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; -import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; +import org.apache.usergrid.persistence.core.metrics.ObservableTimer; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; +import org.apache.usergrid.persistence.core.util.StringUtils; import org.apache.usergrid.persistence.index.AliasedEntityIndex; import org.apache.usergrid.persistence.index.IndexEdge; import org.apache.usergrid.persistence.index.IndexFig; @@ -56,7 +51,6 @@ import com.codahale.metrics.Timer; import com.google.inject.Inject; import rx.Observable; -import rx.Subscriber; import rx.util.async.Async; @@ -72,13 +66,12 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand { private final IndexBufferConsumer producer; private final IndexFig indexFig; private final Timer timer; - private final RxTaskScheduler rxTaskScheduler; + @Inject public IndexRefreshCommandImpl( IndexIdentifier indexIdentifier, EsProvider esProvider, IndexBufferConsumer producer, IndexFig indexFig, MetricsFactory metricsFactory, - final IndexCache indexCache, final RxTaskScheduler rxTaskScheduler ) { - + final IndexCache indexCache ) { this.timer = metricsFactory.getTimer( IndexRefreshCommandImpl.class, "index.refresh.timer" ); @@ -87,17 +80,16 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand { this.producer = producer; this.indexFig = indexFig; this.indexCache = indexCache; - this.rxTaskScheduler = rxTaskScheduler; } @Override - public synchronized Observable<IndexRefreshCommandInfo> execute(String[] indexes) { + public Observable<IndexRefreshCommandInfo> execute( String[] indexes ) { final long start = System.currentTimeMillis(); - Timer.Context refreshTimer = timer.time(); + //id to hunt for final UUID uuid = UUIDUtils.newTimeUUID(); final Entity entity = new Entity( new SimpleId( uuid, "ug_refresh_index_type" ) ); @@ -113,90 +105,96 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand { //save the item final IndexOperationMessage message = new IndexOperationMessage(); message.addIndexRequest( indexRequest ); - final Observable addRecord = producer.put(message); - final Observable refresh = refresh(indexes); + + //add the record to the index + final Observable<IndexOperationMessage> addRecord = producer.put( message ); + + //refresh the index + // final Observable<Boolean> refresh = refresh( indexes ); /** * We have to search. Get by ID returns immediately, even if search isn't ready, therefore we have to search */ //set our filter for entityId fieldname - final SearchRequestBuilder builder = - esProvider.getClient().prepareSearch(alias.getReadAlias()).setTypes(IndexingUtils.ES_ENTITY_TYPE) - .setPostFilter(FilterBuilders.termFilter(IndexingUtils.ENTITY_ID_FIELDNAME, entityId)); - - - //start our processing immediately - final Observable<IndexRefreshCommandInfo> future = Async.toAsync(() -> { - final Observable<IndexRefreshCommandInfo> infoObservable = Observable - .range(0, indexFig.maxRefreshSearches()) - .map(i -> - { - try { - return new IndexRefreshCommandInfo(builder.execute().get().getHits().totalHits() > 0, System.currentTimeMillis() - start); - } catch (Exception ee) { - logger.error("Failed during refresh search for " + uuid, ee); - throw new RuntimeException("Failed during refresh search for " + uuid, ee); - } - }) - .takeWhile(info -> info.hasFinished()) - .takeLast( indexFig.refreshWaitTime(), TimeUnit.MILLISECONDS); - - final Observable<Boolean> combined = Observable.concat(addRecord, refresh); - combined.toBlocking().last(); - final IndexRefreshCommandInfo info = infoObservable.toBlocking().last(); - return info; - },rxTaskScheduler.getAsyncIOScheduler()).call(); - - - return future.doOnNext(found -> { - if (!found.hasFinished()) { - logger.error("Couldn't find record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime()); - } else { - logger.info("found record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime()); + + + /** + * We want to search once we've added our record, then refreshed + */ + final Observable<IndexRefreshCommandInfo> searchObservable = + Observable.range( 0, indexFig.maxRefreshSearches() ).map( i -> { + try { + + final SearchRequestBuilder builder = esProvider.getClient().prepareSearch( alias.getReadAlias() ) + .setTypes( IndexingUtils.ES_ENTITY_TYPE ) + .setPostFilter( FilterBuilders + .termFilter( IndexingUtils.ENTITY_ID_FIELDNAME, + entityId ) ); + + + return new IndexRefreshCommandInfo( builder.execute().get().getHits().totalHits() > 0, + System.currentTimeMillis() - start ); } - }).doOnCompleted(() -> { - //clean up our data - String[] aliases = indexCache.getIndexes(alias, AliasedEntityIndex.AliasType.Read); - DeIndexOperation deIndexRequest = - new DeIndexOperation(aliases, appScope, edge, entity.getId(), entity.getVersion()); + catch ( Exception ee ) { + logger.error( "Failed during refresh search for " + uuid, ee ); + throw new RuntimeException( "Failed during refresh search for " + uuid, ee ); + } + } ).skipWhile( info -> !info.hasFinished() ); - //delete the item - IndexOperationMessage indexOperationMessage = - new IndexOperationMessage(); - indexOperationMessage.addDeIndexRequest(deIndexRequest); - producer.put(indexOperationMessage); - refreshTimer.stop(); - }); - } + //chain it all together - private Observable<Boolean> refresh(final String[] indexes) { + //add the record, take it's last result. On the last add, we then execute the refresh command - return Observable.create(subscriber -> { - try { + final Observable<IndexRefreshCommandInfo> refreshResults = addRecord + + //after our add, run a refresh + .doOnNext( addResult -> { - if (indexes.length == 0) { - logger.debug("Not refreshing indexes. none found"); - } - //Added For Graphite Metrics - RefreshResponse response = esProvider.getClient().admin().indices().prepareRefresh(indexes).execute().actionGet(); - int failedShards = response.getFailedShards(); - int successfulShards = response.getSuccessfulShards(); - ShardOperationFailedException[] sfes = response.getShardFailures(); - if (sfes != null) { - for (ShardOperationFailedException sfe : sfes) { - logger.error("Failed to refresh index:{} reason:{}", sfe.index(), sfe.reason()); - } - } - logger.debug("Refreshed indexes: {},success:{} failed:{} ", StringUtils.join(indexes, ", "), successfulShards, failedShards); - } catch (IndexMissingException e) { - logger.error("Unable to refresh index. Waiting before sleeping.", e); - throw e; + if ( indexes.length == 0 ) { + logger.debug( "Not refreshing indexes. none found" ); + } + //Added For Graphite Metrics + RefreshResponse response = + esProvider.getClient().admin().indices().prepareRefresh( indexes ).execute().actionGet(); + int failedShards = response.getFailedShards(); + int successfulShards = response.getSuccessfulShards(); + ShardOperationFailedException[] sfes = response.getShardFailures(); + if ( sfes != null ) { + for ( ShardOperationFailedException sfe : sfes ) { + logger.error( "Failed to refresh index:{} reason:{}", sfe.index(), sfe.reason() ); + } } - subscriber.onNext(true); - subscriber.onCompleted(); - }); + logger.debug( "Refreshed indexes: {},success:{} failed:{} ", StringUtils.join( indexes, ", " ), + successfulShards, failedShards ); + } ) + + //once the refresh is done execute the search + .flatMap( refreshCommandResult -> searchObservable ) + + //check when found + .doOnNext( found -> { + if ( !found.hasFinished() ) { + logger.error( "Couldn't find record during refresh uuid: {} took ms:{} ", uuid, + found.getExecutionTime() ); + } + else { + logger.info( "found record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime() ); + } + } ).doOnCompleted( () -> { + //clean up our data + String[] aliases = indexCache.getIndexes( alias, AliasedEntityIndex.AliasType.Read ); + DeIndexOperation deIndexRequest = + new DeIndexOperation( aliases, appScope, edge, entity.getId(), entity.getVersion() ); - } + //delete the item + IndexOperationMessage indexOperationMessage = new IndexOperationMessage(); + indexOperationMessage.addDeIndexRequest( deIndexRequest ); + producer.put( indexOperationMessage ); + } ); + + + return Async.start( () -> 1 ).flatMap( intValue -> ObservableTimer.time( refreshResults, timer ) ); + } }
