refresh to observable
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5b8a4dea Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5b8a4dea Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5b8a4dea Branch: refs/heads/USERGRID-587 Commit: 5b8a4deac24b23a08635150a708c9a52ea3cbb96 Parents: a05b46d Author: Shawn Feldman <[email protected]> Authored: Thu Apr 30 10:15:53 2015 -0600 Committer: Shawn Feldman <[email protected]> Committed: Thu Apr 30 10:15:53 2015 -0600 ---------------------------------------------------------------------- .../usergrid/persistence/index/IndexFig.java | 5 ++ .../index/impl/IndexRefreshCommandImpl.java | 88 +++++++++++--------- 2 files changed, 52 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5b8a4dea/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java index e0cacb8..961719d 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java @@ -190,6 +190,11 @@ public interface IndexFig extends GuicyFig { @Default("25") int maxRefreshSearches(); + @Key("elasticsearch.refresh_wait_ms") + @Default("5000") + long refreshWaitTime(); + + @Default( "5000" ) @Key( ELASTICSEARCH_WRITE_TIMEOUT ) long getWriteTimeout(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5b8a4dea/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java index 06e6219..888a805 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java @@ -20,9 +20,12 @@ package org.apache.usergrid.persistence.index.impl; +import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.TimeUnit; +import com.amazonaws.services.elastictranscoder.model.TimeSpan; import org.apache.usergrid.persistence.core.util.StringUtils; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; @@ -89,7 +92,7 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand { @Override - public Observable<IndexRefreshCommandInfo> execute(String[] indexes) { + public synchronized Observable<IndexRefreshCommandInfo> execute(String[] indexes) { final long start = System.currentTimeMillis(); Timer.Context refreshTimer = timer.time(); @@ -130,51 +133,54 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand { //start our processing immediately - final Observable<IndexRefreshCommandInfo> future = Async.toAsync( () -> { - final Observable combined = Observable.concat(addRecord, refresh); - combined.toBlocking().lastOrDefault(null); - try { - boolean found = false; - for ( int i = 0; i < indexFig.maxRefreshSearches(); i++ ) { - final SearchResponse response = builder.execute().get(); - if (response.getHits().totalHits() > 0) { - found = true; - break; + final Observable<IndexRefreshCommandInfo> future = Async.toAsync(() -> { + final Observable<IndexRefreshCommandInfo> infoObservable = Observable + .range(0, indexFig.maxRefreshSearches()) + .map(i -> + { + try { + return new IndexRefreshCommandInfo(builder.execute().get().getHits().totalHits() > 0, System.currentTimeMillis() - start); + } catch (Exception ee) { + logger.error("Failed during refresh search for " + uuid, ee); + throw new RuntimeException("Failed during refresh search for " + uuid, ee); } - } + }) + .takeWhile(info -> info.hasFinished()) + .takeLast( indexFig.refreshWaitTime(), TimeUnit.MILLISECONDS ); - return new IndexRefreshCommandInfo(found,System.currentTimeMillis() - start); - } - catch ( Exception ee ) { - logger.error( "Failed during refresh search for " + uuid, ee ); - throw new RuntimeException( "Failed during refresh search for " + uuid, ee ); - } - }, rxTaskScheduler.getAsyncIOScheduler() ).call(); + final Observable<Boolean> combined = Observable.concat(addRecord, refresh); + combined.toBlocking().last(); + final IndexRefreshCommandInfo info = infoObservable.toBlocking().last(); - return future.doOnNext( found -> { - if ( !found.hasFinished() ) { - logger.error("Couldn't find record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime()); - }else{ - logger.info("found record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime()); - } - } ).doOnCompleted(() -> { - //clean up our data - String[] aliases = indexCache.getIndexes(alias, AliasedEntityIndex.AliasType.Read); - DeIndexOperation deIndexRequest = - new DeIndexOperation(aliases, appScope, edge, entity.getId(), entity.getVersion()); - - //delete the item - IndexOperationMessage indexOperationMessage = - new IndexOperationMessage(); - indexOperationMessage.addDeIndexRequest( deIndexRequest ); - producer.put( indexOperationMessage ); - - refreshTimer.stop(); - }); - } + return info; + + },rxTaskScheduler.getAsyncIOScheduler()).call(); - private Observable<Boolean> refresh(final String[] indexes) { + + return future.doOnNext(found -> { + if (!found.hasFinished()) { + logger.error("Couldn't find record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime()); + } else { + logger.info("found record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime()); + } + }).doOnCompleted(() -> { + //clean up our data + String[] aliases = indexCache.getIndexes(alias, AliasedEntityIndex.AliasType.Read); + DeIndexOperation deIndexRequest = + new DeIndexOperation(aliases, appScope, edge, entity.getId(), entity.getVersion()); + + //delete the item + IndexOperationMessage indexOperationMessage = + new IndexOperationMessage(); + indexOperationMessage.addDeIndexRequest(deIndexRequest); + producer.put(indexOperationMessage); + + refreshTimer.stop(); + }); + } + + private Observable<Boolean> refresh(final String[] indexes) { return Observable.create(subscriber -> { try {
