moving back to refresh its faster

Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ac6b394c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ac6b394c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ac6b394c

Branch: refs/heads/USERGRID-609
Commit: ac6b394c5059cee933a2ee7ea5dd19592338195a
Parents: 5b8a4de
Author: Shawn Feldman <[email protected]>
Authored: Thu Apr 30 13:12:11 2015 -0600
Committer: Shawn Feldman <[email protected]>
Committed: Thu Apr 30 13:12:11 2015 -0600

----------------------------------------------------------------------
 .../index/impl/IndexRefreshCommandImpl.java     | 230 ++++++++++---------
 .../persistence/index/impl/EntityIndexTest.java |   2 +-
 2 files changed, 127 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac6b394c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
index 888a805..c8e96db 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
@@ -92,121 +92,143 @@ public class IndexRefreshCommandImpl implements 
IndexRefreshCommand {
 
 
     @Override
-    public synchronized Observable<IndexRefreshCommandInfo> execute(String[] 
indexes) {
-        final long start = System.currentTimeMillis();
-
+    public  Observable<IndexRefreshCommandInfo> execute(String[] indexes) {
         Timer.Context refreshTimer = timer.time();
-        //id to hunt for
-        final UUID uuid = UUIDUtils.newTimeUUID();
-
-        final Entity entity = new Entity( new SimpleId( uuid, 
"ug_refresh_index_type" ) );
-        EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() );
-        final Id appId = new SimpleId( "ug_refresh_index" );
-        final ApplicationScope appScope = new ApplicationScopeImpl( appId );
-        final IndexEdge edge = new IndexEdgeImpl( appId, "refresh", 
SearchEdge.NodeType.SOURCE, uuid.timestamp() );
-
-
-        final String docId = IndexingUtils.createIndexDocId( appScope, entity, 
edge );
-
-        final Map<String, Object> entityData = EntityToMapConverter.convert( 
appScope, edge, entity );
-
-        final String entityId = entityData.get( 
IndexingUtils.ENTITY_ID_FIELDNAME ).toString();
-
-        //add a tracer record
-        IndexOperation indexRequest = new IndexOperation( 
alias.getWriteAlias(), docId, entityData );
-
-        //save the item
-        final IndexOperationMessage message = new IndexOperationMessage();
-        message.addIndexRequest( indexRequest );
-        final Observable addRecord = producer.put(message);
-        final Observable refresh = refresh(indexes);
-
-        /**
-         * We have to search.  Get by ID returns immediately, even if search 
isn't ready, therefore we have to search
-         */
-
-        final SearchRequestBuilder builder =
-            
esProvider.getClient().prepareSearch(alias.getReadAlias()).setTypes(IndexingUtils.ES_ENTITY_TYPE)
-
-                //set our filter for entityId fieldname
-        
.setPostFilter(FilterBuilders.termFilter(IndexingUtils.ENTITY_ID_FIELDNAME, 
entityId));
-
+        final long start = System.currentTimeMillis();
 
-        //start our processing immediately
         final Observable<IndexRefreshCommandInfo> future = Async.toAsync(() -> 
{
-            final Observable<IndexRefreshCommandInfo> infoObservable = 
Observable
-                .range(0, indexFig.maxRefreshSearches())
-                .map(i ->
-                {
-                    try {
-                        return new 
IndexRefreshCommandInfo(builder.execute().get().getHits().totalHits() > 0, 
System.currentTimeMillis() - start);
-                    } catch (Exception ee) {
-                        logger.error("Failed during refresh search for " + 
uuid, ee);
-                        throw new RuntimeException("Failed during refresh 
search for " + uuid, ee);
-                    }
-                })
-                .takeWhile(info -> info.hasFinished())
-                .takeLast( indexFig.refreshWaitTime(), TimeUnit.MILLISECONDS );
-
-            final Observable<Boolean> combined = Observable.concat(addRecord, 
refresh);
-            combined.toBlocking().last();
-
-            final IndexRefreshCommandInfo info = 
infoObservable.toBlocking().last();
-
+            Boolean worked = refresh(indexes).toBlocking().last();
+            final IndexRefreshCommandInfo info = new 
IndexRefreshCommandInfo(worked, System.currentTimeMillis() - start);
             return info;
 
-        },rxTaskScheduler.getAsyncIOScheduler()).call();
-
+        }, rxTaskScheduler.getAsyncIOScheduler()).call();
+        return future.doOnNext(found -> {
+            if (!found.hasFinished()) {
+                logger.error("Couldn't find record during refresh  took ms:{} 
", found.getExecutionTime());
+            } else {
+                logger.info("found record during refresh  took ms:{} ", 
found.getExecutionTime());
+            }
+        }).doOnCompleted(() -> {
+            refreshTimer.stop();
+        });
+    }
 
-            return future.doOnNext(found -> {
-                if (!found.hasFinished()) {
-                    logger.error("Couldn't find record during refresh uuid: {} 
took ms:{} ", uuid, found.getExecutionTime());
-                } else {
-                    logger.info("found record during refresh uuid: {} took 
ms:{} ", uuid, found.getExecutionTime());
-                }
-            }).doOnCompleted(() -> {
-                //clean up our data
-                String[] aliases = indexCache.getIndexes(alias, 
AliasedEntityIndex.AliasType.Read);
-                DeIndexOperation deIndexRequest =
-                    new DeIndexOperation(aliases, appScope, edge, 
entity.getId(), entity.getVersion());
-
-                //delete the item
-                IndexOperationMessage indexOperationMessage =
-                    new IndexOperationMessage();
-                indexOperationMessage.addDeIndexRequest(deIndexRequest);
-                producer.put(indexOperationMessage);
-
-                refreshTimer.stop();
-            });
-        }
+    private void insertRecord(){
+
+//        final long start = System.currentTimeMillis();
+//
+//        Timer.Context refreshTimer = timer.time();
+//        //id to hunt for
+//        final UUID uuid = UUIDUtils.newTimeUUID();
+//
+//        final Entity entity = new Entity( new SimpleId( uuid, 
"ug_refresh_index_type" ) );
+//        EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() );
+//        final Id appId = new SimpleId( "ug_refresh_index" );
+//        final ApplicationScope appScope = new ApplicationScopeImpl( appId );
+//        final IndexEdge edge = new IndexEdgeImpl( appId, "refresh", 
SearchEdge.NodeType.SOURCE, uuid.timestamp() );
+//
+//
+//        final String docId = IndexingUtils.createIndexDocId( appScope, 
entity, edge );
+//
+//        final Map<String, Object> entityData = EntityToMapConverter.convert( 
appScope, edge, entity );
+//
+//        final String entityId = entityData.get( 
IndexingUtils.ENTITY_ID_FIELDNAME ).toString();
+//
+//        //add a tracer record
+//        IndexOperation indexRequest = new IndexOperation( 
alias.getWriteAlias(), docId, entityData );
+//
+//        //save the item
+//        final IndexOperationMessage message = new IndexOperationMessage();
+//        message.addIndexRequest( indexRequest );
+//        final Observable addRecord = producer.put(message);
+//        final Observable refresh = refresh(indexes);
+//
+//        /**
+//         * We have to search.  Get by ID returns immediately, even if search 
isn't ready, therefore we have to search
+//         */
+//
+//        final SearchRequestBuilder builder =
+//            
esProvider.getClient().prepareSearch(alias.getReadAlias()).setTypes(IndexingUtils.ES_ENTITY_TYPE)
+//
+//                //set our filter for entityId fieldname
+//        
.setPostFilter(FilterBuilders.termFilter(IndexingUtils.ENTITY_ID_FIELDNAME, 
entityId));
+//
+//
+//        //start our processing immediately
+//        final Observable<IndexRefreshCommandInfo> future = Async.toAsync(() 
-> {
+//            final Observable<IndexRefreshCommandInfo> infoObservable = 
Observable
+//                .range(0, indexFig.maxRefreshSearches())
+//                .map(i ->
+//                {
+//                    try {
+//                        return new 
IndexRefreshCommandInfo(builder.execute().get().getHits().totalHits() > 0, 
System.currentTimeMillis() - start);
+//                    } catch (Exception ee) {
+//                        logger.error("Failed during refresh search for " + 
uuid, ee);
+//                        throw new RuntimeException("Failed during refresh 
search for " + uuid, ee);
+//                    }
+//                })
+//                .takeWhile(info -> info.hasFinished())
+//                .takeLast( indexFig.refreshWaitTime(), 
TimeUnit.MILLISECONDS);
+//
+//            final Observable<Boolean> combined = 
Observable.concat(addRecord, refresh);
+//            combined.toBlocking().last();
+//
+//            final IndexRefreshCommandInfo info = 
infoObservable.toBlocking().last();
+//
+//            return info;
+//
+//        },rxTaskScheduler.getAsyncIOScheduler()).call();
+//
+//
+//            return future.doOnNext(found -> {
+//                if (!found.hasFinished()) {
+//                    logger.error("Couldn't find record during refresh uuid: 
{} took ms:{} ", uuid, found.getExecutionTime());
+//                } else {
+//                    logger.info("found record during refresh uuid: {} took 
ms:{} ", uuid, found.getExecutionTime());
+//                }
+//            }).doOnCompleted(() -> {
+//                //clean up our data
+//                String[] aliases = indexCache.getIndexes(alias, 
AliasedEntityIndex.AliasType.Read);
+//                DeIndexOperation deIndexRequest =
+//                    new DeIndexOperation(aliases, appScope, edge, 
entity.getId(), entity.getVersion());
+//
+//                //delete the item
+//                IndexOperationMessage indexOperationMessage =
+//                    new IndexOperationMessage();
+//                indexOperationMessage.addDeIndexRequest(deIndexRequest);
+//                producer.put(indexOperationMessage);
+//
+//                refreshTimer.stop();
+//            });
+    }
 
         private Observable<Boolean> refresh(final String[] indexes) {
 
-        return Observable.create(subscriber -> {
-            try {
+            return Observable.create(subscriber -> {
+                try {
 
-                if (indexes.length == 0) {
-                    logger.debug("Not refreshing indexes. none found");
-                }
-                //Added For Graphite Metrics
-                RefreshResponse response = 
esProvider.getClient().admin().indices().prepareRefresh(indexes).execute().actionGet();
-                int failedShards = response.getFailedShards();
-                int successfulShards = response.getSuccessfulShards();
-                ShardOperationFailedException[] sfes = 
response.getShardFailures();
-                if (sfes != null) {
-                    for (ShardOperationFailedException sfe : sfes) {
-                        logger.error("Failed to refresh index:{} reason:{}", 
sfe.index(), sfe.reason());
+                    if (indexes.length == 0) {
+                        logger.debug("Not refreshing indexes. none found");
                     }
-                }
-                logger.debug("Refreshed indexes: {},success:{} failed:{} ", 
StringUtils.join(indexes, ", "), successfulShards, failedShards);
+                    //Added For Graphite Metrics
+                    RefreshResponse response = 
esProvider.getClient().admin().indices().prepareRefresh(indexes).execute().actionGet();
+                    int failedShards = response.getFailedShards();
+                    int successfulShards = response.getSuccessfulShards();
+                    ShardOperationFailedException[] sfes = 
response.getShardFailures();
+                    if (sfes != null) {
+                        for (ShardOperationFailedException sfe : sfes) {
+                            logger.error("Failed to refresh index:{} 
reason:{}", sfe.index(), sfe.reason());
+                        }
+                    }
+                    logger.debug("Refreshed indexes: {},success:{} failed:{} 
", StringUtils.join(indexes, ", "), successfulShards, failedShards);
 
-            } catch (IndexMissingException e) {
-                logger.error("Unable to refresh index. Waiting before 
sleeping.", e);
-                throw e;
-            }
-            subscriber.onNext(true);
-            subscriber.onCompleted();
-        });
+                } catch (IndexMissingException e) {
+                    logger.error("Unable to refresh index. Waiting before 
sleeping.", e);
+                    throw e;
+                }
+                subscriber.onNext(true);
+                subscriber.onCompleted();
+            });
 
-    }
+        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac6b394c/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
 
b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index fee5917..41795ad 100644
--- 
a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ 
b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -284,7 +284,7 @@ public class EntityIndexTest extends BaseIT {
 
     @Test
     public void testDeleteWithAlias() throws IOException {
-        Id appId = new SimpleId( "application" );
+        Id appId = new SimpleId(UUID.randomUUID(), "application" );
 
         ApplicationScope applicationScope = new ApplicationScopeImpl( appId );
 

Reply via email to