moving back to refresh its faster
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/faafa378 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/faafa378 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/faafa378 Branch: refs/heads/two-dot-o-dev Commit: faafa378524ced0a85fd4cb3e989c79e02dda0dd Parents: ac6b394 Author: Shawn Feldman <[email protected]> Authored: Thu Apr 30 13:15:28 2015 -0600 Committer: Shawn Feldman <[email protected]> Committed: Thu Apr 30 13:15:28 2015 -0600 ---------------------------------------------------------------------- .../usergrid/persistence/index/IndexFig.java | 2 +- .../index/impl/IndexRefreshCommandImpl.java | 174 ++++++++----------- 2 files changed, 72 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/faafa378/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java index 961719d..39714f2 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java @@ -187,7 +187,7 @@ public interface IndexFig extends GuicyFig { String getClientType(); @Key("elasticsearch.refresh_search_max") - @Default("25") + @Default("10") int maxRefreshSearches(); @Key("elasticsearch.refresh_wait_ms") http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/faafa378/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java index c8e96db..71a05a0 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java @@ -92,115 +92,83 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand { @Override - public Observable<IndexRefreshCommandInfo> execute(String[] indexes) { - Timer.Context refreshTimer = timer.time(); + public synchronized Observable<IndexRefreshCommandInfo> execute(String[] indexes) { + + final long start = System.currentTimeMillis(); + Timer.Context refreshTimer = timer.time(); + //id to hunt for + final UUID uuid = UUIDUtils.newTimeUUID(); + final Entity entity = new Entity( new SimpleId( uuid, "ug_refresh_index_type" ) ); + EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() ); + final Id appId = new SimpleId( "ug_refresh_index" ); + final ApplicationScope appScope = new ApplicationScopeImpl( appId ); + final IndexEdge edge = new IndexEdgeImpl( appId, "refresh", SearchEdge.NodeType.SOURCE, uuid.timestamp() ); + final String docId = IndexingUtils.createIndexDocId( appScope, entity, edge ); + final Map<String, Object> entityData = EntityToMapConverter.convert( appScope, edge, entity ); + final String entityId = entityData.get( IndexingUtils.ENTITY_ID_FIELDNAME ).toString(); + //add a tracer record + IndexOperation indexRequest = new IndexOperation( alias.getWriteAlias(), docId, entityData ); + //save the item + final IndexOperationMessage message = new IndexOperationMessage(); + message.addIndexRequest( indexRequest ); + final Observable addRecord = producer.put(message); + final Observable refresh = refresh(indexes); + + /** + * We have to search. Get by ID returns immediately, even if search isn't ready, therefore we have to search + */ + //set our filter for entityId fieldname + final SearchRequestBuilder builder = + esProvider.getClient().prepareSearch(alias.getReadAlias()).setTypes(IndexingUtils.ES_ENTITY_TYPE) + .setPostFilter(FilterBuilders.termFilter(IndexingUtils.ENTITY_ID_FIELDNAME, entityId)); + + + //start our processing immediately final Observable<IndexRefreshCommandInfo> future = Async.toAsync(() -> { - Boolean worked = refresh(indexes).toBlocking().last(); - final IndexRefreshCommandInfo info = new IndexRefreshCommandInfo(worked, System.currentTimeMillis() - start); + final Observable<IndexRefreshCommandInfo> infoObservable = Observable + .range(0, indexFig.maxRefreshSearches()) + .map(i -> + { + try { + return new IndexRefreshCommandInfo(builder.execute().get().getHits().totalHits() > 0, System.currentTimeMillis() - start); + } catch (Exception ee) { + logger.error("Failed during refresh search for " + uuid, ee); + throw new RuntimeException("Failed during refresh search for " + uuid, ee); + } + }) + .takeWhile(info -> info.hasFinished()) + .takeLast( indexFig.refreshWaitTime(), TimeUnit.MILLISECONDS); + + final Observable<Boolean> combined = Observable.concat(addRecord, refresh); + combined.toBlocking().last(); + final IndexRefreshCommandInfo info = infoObservable.toBlocking().last(); return info; + },rxTaskScheduler.getAsyncIOScheduler()).call(); - }, rxTaskScheduler.getAsyncIOScheduler()).call(); - return future.doOnNext(found -> { - if (!found.hasFinished()) { - logger.error("Couldn't find record during refresh took ms:{} ", found.getExecutionTime()); - } else { - logger.info("found record during refresh took ms:{} ", found.getExecutionTime()); - } - }).doOnCompleted(() -> { - refreshTimer.stop(); - }); - } - private void insertRecord(){ - -// final long start = System.currentTimeMillis(); -// -// Timer.Context refreshTimer = timer.time(); -// //id to hunt for -// final UUID uuid = UUIDUtils.newTimeUUID(); -// -// final Entity entity = new Entity( new SimpleId( uuid, "ug_refresh_index_type" ) ); -// EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() ); -// final Id appId = new SimpleId( "ug_refresh_index" ); -// final ApplicationScope appScope = new ApplicationScopeImpl( appId ); -// final IndexEdge edge = new IndexEdgeImpl( appId, "refresh", SearchEdge.NodeType.SOURCE, uuid.timestamp() ); -// -// -// final String docId = IndexingUtils.createIndexDocId( appScope, entity, edge ); -// -// final Map<String, Object> entityData = EntityToMapConverter.convert( appScope, edge, entity ); -// -// final String entityId = entityData.get( IndexingUtils.ENTITY_ID_FIELDNAME ).toString(); -// -// //add a tracer record -// IndexOperation indexRequest = new IndexOperation( alias.getWriteAlias(), docId, entityData ); -// -// //save the item -// final IndexOperationMessage message = new IndexOperationMessage(); -// message.addIndexRequest( indexRequest ); -// final Observable addRecord = producer.put(message); -// final Observable refresh = refresh(indexes); -// -// /** -// * We have to search. Get by ID returns immediately, even if search isn't ready, therefore we have to search -// */ -// -// final SearchRequestBuilder builder = -// esProvider.getClient().prepareSearch(alias.getReadAlias()).setTypes(IndexingUtils.ES_ENTITY_TYPE) -// -// //set our filter for entityId fieldname -// .setPostFilter(FilterBuilders.termFilter(IndexingUtils.ENTITY_ID_FIELDNAME, entityId)); -// -// -// //start our processing immediately -// final Observable<IndexRefreshCommandInfo> future = Async.toAsync(() -> { -// final Observable<IndexRefreshCommandInfo> infoObservable = Observable -// .range(0, indexFig.maxRefreshSearches()) -// .map(i -> -// { -// try { -// return new IndexRefreshCommandInfo(builder.execute().get().getHits().totalHits() > 0, System.currentTimeMillis() - start); -// } catch (Exception ee) { -// logger.error("Failed during refresh search for " + uuid, ee); -// throw new RuntimeException("Failed during refresh search for " + uuid, ee); -// } -// }) -// .takeWhile(info -> info.hasFinished()) -// .takeLast( indexFig.refreshWaitTime(), TimeUnit.MILLISECONDS); -// -// final Observable<Boolean> combined = Observable.concat(addRecord, refresh); -// combined.toBlocking().last(); -// -// final IndexRefreshCommandInfo info = infoObservable.toBlocking().last(); -// -// return info; -// -// },rxTaskScheduler.getAsyncIOScheduler()).call(); -// -// -// return future.doOnNext(found -> { -// if (!found.hasFinished()) { -// logger.error("Couldn't find record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime()); -// } else { -// logger.info("found record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime()); -// } -// }).doOnCompleted(() -> { -// //clean up our data -// String[] aliases = indexCache.getIndexes(alias, AliasedEntityIndex.AliasType.Read); -// DeIndexOperation deIndexRequest = -// new DeIndexOperation(aliases, appScope, edge, entity.getId(), entity.getVersion()); -// -// //delete the item -// IndexOperationMessage indexOperationMessage = -// new IndexOperationMessage(); -// indexOperationMessage.addDeIndexRequest(deIndexRequest); -// producer.put(indexOperationMessage); -// -// refreshTimer.stop(); -// }); - } + return future.doOnNext(found -> { + if (!found.hasFinished()) { + logger.error("Couldn't find record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime()); + } else { + logger.info("found record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime()); + } + }).doOnCompleted(() -> { + //clean up our data + String[] aliases = indexCache.getIndexes(alias, AliasedEntityIndex.AliasType.Read); + DeIndexOperation deIndexRequest = + new DeIndexOperation(aliases, appScope, edge, entity.getId(), entity.getVersion()); + + //delete the item + IndexOperationMessage indexOperationMessage = + new IndexOperationMessage(); + indexOperationMessage.addDeIndexRequest(deIndexRequest); + producer.put(indexOperationMessage); + + refreshTimer.stop(); + }); + } private Observable<Boolean> refresh(final String[] indexes) {
