Repository: incubator-usergrid Updated Branches: refs/heads/two-dot-o-dev 77a18f596 -> a05b46d93
adding refresh back Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a05b46d9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a05b46d9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a05b46d9 Branch: refs/heads/two-dot-o-dev Commit: a05b46d9359e0f5a8ff81c4c790db1bc236d4484 Parents: 77a18f5 Author: Shawn Feldman <[email protected]> Authored: Thu Apr 30 09:29:32 2015 -0600 Committer: Shawn Feldman <[email protected]> Committed: Thu Apr 30 09:29:32 2015 -0600 ---------------------------------------------------------------------- .../usergrid/persistence/index/IndexFig.java | 8 ---- .../persistence/index/IndexRefreshCommand.java | 2 +- .../index/impl/EsEntityIndexImpl.java | 2 +- .../index/impl/IndexRefreshCommandImpl.java | 48 +++++++++++++++++--- 4 files changed, 43 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a05b46d9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java index cf91da9..e0cacb8 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java @@ -190,14 +190,6 @@ public interface IndexFig extends GuicyFig { @Default("25") int maxRefreshSearches(); - @Key("elasticsearch.refresh_sleep_ms") - @Default("100") - long refreshSleep(); - - @Default( "5000" ) - @Key( ELASTICSEARCH_QUERY_TIMEOUT ) - long getQueryTimeout(); - @Default( "5000" ) @Key( ELASTICSEARCH_WRITE_TIMEOUT ) long getWriteTimeout(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a05b46d9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexRefreshCommand.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexRefreshCommand.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexRefreshCommand.java index 5cc532a..03be233 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexRefreshCommand.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexRefreshCommand.java @@ -26,7 +26,7 @@ import rx.Observable; */ public interface IndexRefreshCommand { - Observable<IndexRefreshCommandInfo> execute(); + Observable<IndexRefreshCommandInfo> execute(String[] indexes); public static class IndexRefreshCommandInfo{ private final boolean hasFinished; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a05b46d9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java index fba6976..20a940f 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java @@ -325,7 +325,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex,VersionedData { refreshIndexMeter.mark(); Observable future = producer.put(new IndexOperationMessage()); future.toBlocking().last(); - return indexRefreshCommand.execute(); + return indexRefreshCommand.execute(getUniqueIndexes()); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a05b46d9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java index 0abd9c4..06e6219 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java @@ -23,9 +23,13 @@ package org.apache.usergrid.persistence.index.impl; import java.util.Map; import java.util.UUID; +import org.apache.usergrid.persistence.core.util.StringUtils; +import org.elasticsearch.action.ShardOperationFailedException; +import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.index.query.FilterBuilders; +import org.elasticsearch.indices.IndexMissingException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +53,7 @@ import com.codahale.metrics.Timer; import com.google.inject.Inject; import rx.Observable; +import rx.Subscriber; import rx.util.async.Async; @@ -84,7 +89,7 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand { @Override - public Observable<IndexRefreshCommandInfo> execute() { + public Observable<IndexRefreshCommandInfo> execute(String[] indexes) { final long start = System.currentTimeMillis(); Timer.Context refreshTimer = timer.time(); @@ -108,9 +113,10 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand { IndexOperation indexRequest = new IndexOperation( alias.getWriteAlias(), docId, entityData ); //save the item - IndexOperationMessage message = new IndexOperationMessage(); + final IndexOperationMessage message = new IndexOperationMessage(); message.addIndexRequest( indexRequest ); - producer.put( message ); + final Observable addRecord = producer.put(message); + final Observable refresh = refresh(indexes); /** * We have to search. Get by ID returns immediately, even if search isn't ready, therefore we have to search @@ -125,18 +131,16 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand { //start our processing immediately final Observable<IndexRefreshCommandInfo> future = Async.toAsync( () -> { + final Observable combined = Observable.concat(addRecord, refresh); + combined.toBlocking().lastOrDefault(null); try { boolean found = false; for ( int i = 0; i < indexFig.maxRefreshSearches(); i++ ) { - Thread.sleep(indexFig.refreshSleep()); - final SearchResponse response = builder.execute().get(); - if (response.getHits().totalHits() > 0) { found = true; break; } - } return new IndexRefreshCommandInfo(found,System.currentTimeMillis() - start); @@ -169,4 +173,34 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand { refreshTimer.stop(); }); } + + private Observable<Boolean> refresh(final String[] indexes) { + + return Observable.create(subscriber -> { + try { + + if (indexes.length == 0) { + logger.debug("Not refreshing indexes. none found"); + } + //Added For Graphite Metrics + RefreshResponse response = esProvider.getClient().admin().indices().prepareRefresh(indexes).execute().actionGet(); + int failedShards = response.getFailedShards(); + int successfulShards = response.getSuccessfulShards(); + ShardOperationFailedException[] sfes = response.getShardFailures(); + if (sfes != null) { + for (ShardOperationFailedException sfe : sfes) { + logger.error("Failed to refresh index:{} reason:{}", sfe.index(), sfe.reason()); + } + } + logger.debug("Refreshed indexes: {},success:{} failed:{} ", StringUtils.join(indexes, ", "), successfulShards, failedShards); + + } catch (IndexMissingException e) { + logger.error("Unable to refresh index. Waiting before sleeping.", e); + throw e; + } + subscriber.onNext(true); + subscriber.onCompleted(); + }); + + } }
