adding refresh back

Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a05b46d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a05b46d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a05b46d9

Branch: refs/heads/USERGRID-609
Commit: a05b46d9359e0f5a8ff81c4c790db1bc236d4484
Parents: 77a18f5
Author: Shawn Feldman <[email protected]>
Authored: Thu Apr 30 09:29:32 2015 -0600
Committer: Shawn Feldman <[email protected]>
Committed: Thu Apr 30 09:29:32 2015 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/index/IndexFig.java    |  8 ----
 .../persistence/index/IndexRefreshCommand.java  |  2 +-
 .../index/impl/EsEntityIndexImpl.java           |  2 +-
 .../index/impl/IndexRefreshCommandImpl.java     | 48 +++++++++++++++++---
 4 files changed, 43 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a05b46d9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index cf91da9..e0cacb8 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -190,14 +190,6 @@ public interface IndexFig extends GuicyFig {
     @Default("25")
     int maxRefreshSearches();
 
-    @Key("elasticsearch.refresh_sleep_ms")
-    @Default("100")
-    long refreshSleep();
-
-    @Default( "5000" )
-    @Key( ELASTICSEARCH_QUERY_TIMEOUT )
-    long getQueryTimeout();
-
     @Default( "5000" )
     @Key( ELASTICSEARCH_WRITE_TIMEOUT )
     long getWriteTimeout();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a05b46d9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexRefreshCommand.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexRefreshCommand.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexRefreshCommand.java
index 5cc532a..03be233 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexRefreshCommand.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexRefreshCommand.java
@@ -26,7 +26,7 @@ import rx.Observable;
  */
 public interface IndexRefreshCommand {
 
-    Observable<IndexRefreshCommandInfo> execute();
+    Observable<IndexRefreshCommandInfo> execute(String[] indexes);
 
     public static class IndexRefreshCommandInfo{
         private final boolean hasFinished;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a05b46d9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index fba6976..20a940f 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -325,7 +325,7 @@ public class EsEntityIndexImpl implements 
AliasedEntityIndex,VersionedData {
         refreshIndexMeter.mark();
         Observable future = producer.put(new IndexOperationMessage());
         future.toBlocking().last();
-        return indexRefreshCommand.execute();
+        return indexRefreshCommand.execute(getUniqueIndexes());
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a05b46d9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
index 0abd9c4..06e6219 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
@@ -23,9 +23,13 @@ package org.apache.usergrid.persistence.index.impl;
 import java.util.Map;
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.core.util.StringUtils;
+import org.elasticsearch.action.ShardOperationFailedException;
+import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.index.query.FilterBuilders;
+import org.elasticsearch.indices.IndexMissingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,6 +53,7 @@ import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
 
 import rx.Observable;
+import rx.Subscriber;
 import rx.util.async.Async;
 
 
@@ -84,7 +89,7 @@ public class IndexRefreshCommandImpl implements 
IndexRefreshCommand {
 
 
     @Override
-    public Observable<IndexRefreshCommandInfo> execute() {
+    public Observable<IndexRefreshCommandInfo> execute(String[] indexes) {
         final long start = System.currentTimeMillis();
 
         Timer.Context refreshTimer = timer.time();
@@ -108,9 +113,10 @@ public class IndexRefreshCommandImpl implements 
IndexRefreshCommand {
         IndexOperation indexRequest = new IndexOperation( 
alias.getWriteAlias(), docId, entityData );
 
         //save the item
-        IndexOperationMessage message = new IndexOperationMessage();
+        final IndexOperationMessage message = new IndexOperationMessage();
         message.addIndexRequest( indexRequest );
-        producer.put( message );
+        final Observable addRecord = producer.put(message);
+        final Observable refresh = refresh(indexes);
 
         /**
          * We have to search.  Get by ID returns immediately, even if search 
isn't ready, therefore we have to search
@@ -125,18 +131,16 @@ public class IndexRefreshCommandImpl implements 
IndexRefreshCommand {
 
         //start our processing immediately
         final Observable<IndexRefreshCommandInfo> future = Async.toAsync( () 
-> {
+            final Observable combined =  Observable.concat(addRecord, refresh);
+            combined.toBlocking().lastOrDefault(null);
             try {
                 boolean found = false;
                 for ( int i = 0; i < indexFig.maxRefreshSearches(); i++ ) {
-                    Thread.sleep(indexFig.refreshSleep());
-
                     final SearchResponse response = builder.execute().get();
-
                     if (response.getHits().totalHits() > 0) {
                         found = true;
                         break;
                     }
-
                 }
 
                 return new 
IndexRefreshCommandInfo(found,System.currentTimeMillis() - start);
@@ -169,4 +173,34 @@ public class IndexRefreshCommandImpl implements 
IndexRefreshCommand {
             refreshTimer.stop();
         });
     }
+
+    private Observable<Boolean> refresh(final String[] indexes) {
+
+        return Observable.create(subscriber -> {
+            try {
+
+                if (indexes.length == 0) {
+                    logger.debug("Not refreshing indexes. none found");
+                }
+                //Added For Graphite Metrics
+                RefreshResponse response = 
esProvider.getClient().admin().indices().prepareRefresh(indexes).execute().actionGet();
+                int failedShards = response.getFailedShards();
+                int successfulShards = response.getSuccessfulShards();
+                ShardOperationFailedException[] sfes = 
response.getShardFailures();
+                if (sfes != null) {
+                    for (ShardOperationFailedException sfe : sfes) {
+                        logger.error("Failed to refresh index:{} reason:{}", 
sfe.index(), sfe.reason());
+                    }
+                }
+                logger.debug("Refreshed indexes: {},success:{} failed:{} ", 
StringUtils.join(indexes, ", "), successfulShards, failedShards);
+
+            } catch (IndexMissingException e) {
+                logger.error("Unable to refresh index. Waiting before 
sleeping.", e);
+                throw e;
+            }
+            subscriber.onNext(true);
+            subscriber.onCompleted();
+        });
+
+    }
 }

Reply via email to