Repository: incubator-usergrid Updated Branches: refs/heads/two-dot-o-dev 5b8a4deac -> faafa3785
moving back to refresh its faster Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ac6b394c Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ac6b394c Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ac6b394c Branch: refs/heads/two-dot-o-dev Commit: ac6b394c5059cee933a2ee7ea5dd19592338195a Parents: 5b8a4de Author: Shawn Feldman <[email protected]> Authored: Thu Apr 30 13:12:11 2015 -0600 Committer: Shawn Feldman <[email protected]> Committed: Thu Apr 30 13:12:11 2015 -0600 ---------------------------------------------------------------------- .../index/impl/IndexRefreshCommandImpl.java | 230 ++++++++++--------- .../persistence/index/impl/EntityIndexTest.java | 2 +- 2 files changed, 127 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac6b394c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java index 888a805..c8e96db 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java @@ -92,121 +92,143 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand { @Override - public synchronized Observable<IndexRefreshCommandInfo> execute(String[] indexes) { - final long start = System.currentTimeMillis(); - + public Observable<IndexRefreshCommandInfo> execute(String[] indexes) { Timer.Context refreshTimer = timer.time(); - //id to hunt for - final UUID uuid = UUIDUtils.newTimeUUID(); - - final Entity entity = new Entity( new SimpleId( uuid, "ug_refresh_index_type" ) ); - EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() ); - final Id appId = new SimpleId( "ug_refresh_index" ); - final ApplicationScope appScope = new ApplicationScopeImpl( appId ); - final IndexEdge edge = new IndexEdgeImpl( appId, "refresh", SearchEdge.NodeType.SOURCE, uuid.timestamp() ); - - - final String docId = IndexingUtils.createIndexDocId( appScope, entity, edge ); - - final Map<String, Object> entityData = EntityToMapConverter.convert( appScope, edge, entity ); - - final String entityId = entityData.get( IndexingUtils.ENTITY_ID_FIELDNAME ).toString(); - - //add a tracer record - IndexOperation indexRequest = new IndexOperation( alias.getWriteAlias(), docId, entityData ); - - //save the item - final IndexOperationMessage message = new IndexOperationMessage(); - message.addIndexRequest( indexRequest ); - final Observable addRecord = producer.put(message); - final Observable refresh = refresh(indexes); - - /** - * We have to search. Get by ID returns immediately, even if search isn't ready, therefore we have to search - */ - - final SearchRequestBuilder builder = - esProvider.getClient().prepareSearch(alias.getReadAlias()).setTypes(IndexingUtils.ES_ENTITY_TYPE) - - //set our filter for entityId fieldname - .setPostFilter(FilterBuilders.termFilter(IndexingUtils.ENTITY_ID_FIELDNAME, entityId)); - + final long start = System.currentTimeMillis(); - //start our processing immediately final Observable<IndexRefreshCommandInfo> future = Async.toAsync(() -> { - final Observable<IndexRefreshCommandInfo> infoObservable = Observable - .range(0, indexFig.maxRefreshSearches()) - .map(i -> - { - try { - return new IndexRefreshCommandInfo(builder.execute().get().getHits().totalHits() > 0, System.currentTimeMillis() - start); - } catch (Exception ee) { - logger.error("Failed during refresh search for " + uuid, ee); - throw new RuntimeException("Failed during refresh search for " + uuid, ee); - } - }) - .takeWhile(info -> info.hasFinished()) - .takeLast( indexFig.refreshWaitTime(), TimeUnit.MILLISECONDS ); - - final Observable<Boolean> combined = Observable.concat(addRecord, refresh); - combined.toBlocking().last(); - - final IndexRefreshCommandInfo info = infoObservable.toBlocking().last(); - + Boolean worked = refresh(indexes).toBlocking().last(); + final IndexRefreshCommandInfo info = new IndexRefreshCommandInfo(worked, System.currentTimeMillis() - start); return info; - },rxTaskScheduler.getAsyncIOScheduler()).call(); - + }, rxTaskScheduler.getAsyncIOScheduler()).call(); + return future.doOnNext(found -> { + if (!found.hasFinished()) { + logger.error("Couldn't find record during refresh took ms:{} ", found.getExecutionTime()); + } else { + logger.info("found record during refresh took ms:{} ", found.getExecutionTime()); + } + }).doOnCompleted(() -> { + refreshTimer.stop(); + }); + } - return future.doOnNext(found -> { - if (!found.hasFinished()) { - logger.error("Couldn't find record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime()); - } else { - logger.info("found record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime()); - } - }).doOnCompleted(() -> { - //clean up our data - String[] aliases = indexCache.getIndexes(alias, AliasedEntityIndex.AliasType.Read); - DeIndexOperation deIndexRequest = - new DeIndexOperation(aliases, appScope, edge, entity.getId(), entity.getVersion()); - - //delete the item - IndexOperationMessage indexOperationMessage = - new IndexOperationMessage(); - indexOperationMessage.addDeIndexRequest(deIndexRequest); - producer.put(indexOperationMessage); - - refreshTimer.stop(); - }); - } + private void insertRecord(){ + +// final long start = System.currentTimeMillis(); +// +// Timer.Context refreshTimer = timer.time(); +// //id to hunt for +// final UUID uuid = UUIDUtils.newTimeUUID(); +// +// final Entity entity = new Entity( new SimpleId( uuid, "ug_refresh_index_type" ) ); +// EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() ); +// final Id appId = new SimpleId( "ug_refresh_index" ); +// final ApplicationScope appScope = new ApplicationScopeImpl( appId ); +// final IndexEdge edge = new IndexEdgeImpl( appId, "refresh", SearchEdge.NodeType.SOURCE, uuid.timestamp() ); +// +// +// final String docId = IndexingUtils.createIndexDocId( appScope, entity, edge ); +// +// final Map<String, Object> entityData = EntityToMapConverter.convert( appScope, edge, entity ); +// +// final String entityId = entityData.get( IndexingUtils.ENTITY_ID_FIELDNAME ).toString(); +// +// //add a tracer record +// IndexOperation indexRequest = new IndexOperation( alias.getWriteAlias(), docId, entityData ); +// +// //save the item +// final IndexOperationMessage message = new IndexOperationMessage(); +// message.addIndexRequest( indexRequest ); +// final Observable addRecord = producer.put(message); +// final Observable refresh = refresh(indexes); +// +// /** +// * We have to search. Get by ID returns immediately, even if search isn't ready, therefore we have to search +// */ +// +// final SearchRequestBuilder builder = +// esProvider.getClient().prepareSearch(alias.getReadAlias()).setTypes(IndexingUtils.ES_ENTITY_TYPE) +// +// //set our filter for entityId fieldname +// .setPostFilter(FilterBuilders.termFilter(IndexingUtils.ENTITY_ID_FIELDNAME, entityId)); +// +// +// //start our processing immediately +// final Observable<IndexRefreshCommandInfo> future = Async.toAsync(() -> { +// final Observable<IndexRefreshCommandInfo> infoObservable = Observable +// .range(0, indexFig.maxRefreshSearches()) +// .map(i -> +// { +// try { +// return new IndexRefreshCommandInfo(builder.execute().get().getHits().totalHits() > 0, System.currentTimeMillis() - start); +// } catch (Exception ee) { +// logger.error("Failed during refresh search for " + uuid, ee); +// throw new RuntimeException("Failed during refresh search for " + uuid, ee); +// } +// }) +// .takeWhile(info -> info.hasFinished()) +// .takeLast( indexFig.refreshWaitTime(), TimeUnit.MILLISECONDS); +// +// final Observable<Boolean> combined = Observable.concat(addRecord, refresh); +// combined.toBlocking().last(); +// +// final IndexRefreshCommandInfo info = infoObservable.toBlocking().last(); +// +// return info; +// +// },rxTaskScheduler.getAsyncIOScheduler()).call(); +// +// +// return future.doOnNext(found -> { +// if (!found.hasFinished()) { +// logger.error("Couldn't find record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime()); +// } else { +// logger.info("found record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime()); +// } +// }).doOnCompleted(() -> { +// //clean up our data +// String[] aliases = indexCache.getIndexes(alias, AliasedEntityIndex.AliasType.Read); +// DeIndexOperation deIndexRequest = +// new DeIndexOperation(aliases, appScope, edge, entity.getId(), entity.getVersion()); +// +// //delete the item +// IndexOperationMessage indexOperationMessage = +// new IndexOperationMessage(); +// indexOperationMessage.addDeIndexRequest(deIndexRequest); +// producer.put(indexOperationMessage); +// +// refreshTimer.stop(); +// }); + } private Observable<Boolean> refresh(final String[] indexes) { - return Observable.create(subscriber -> { - try { + return Observable.create(subscriber -> { + try { - if (indexes.length == 0) { - logger.debug("Not refreshing indexes. none found"); - } - //Added For Graphite Metrics - RefreshResponse response = esProvider.getClient().admin().indices().prepareRefresh(indexes).execute().actionGet(); - int failedShards = response.getFailedShards(); - int successfulShards = response.getSuccessfulShards(); - ShardOperationFailedException[] sfes = response.getShardFailures(); - if (sfes != null) { - for (ShardOperationFailedException sfe : sfes) { - logger.error("Failed to refresh index:{} reason:{}", sfe.index(), sfe.reason()); + if (indexes.length == 0) { + logger.debug("Not refreshing indexes. none found"); } - } - logger.debug("Refreshed indexes: {},success:{} failed:{} ", StringUtils.join(indexes, ", "), successfulShards, failedShards); + //Added For Graphite Metrics + RefreshResponse response = esProvider.getClient().admin().indices().prepareRefresh(indexes).execute().actionGet(); + int failedShards = response.getFailedShards(); + int successfulShards = response.getSuccessfulShards(); + ShardOperationFailedException[] sfes = response.getShardFailures(); + if (sfes != null) { + for (ShardOperationFailedException sfe : sfes) { + logger.error("Failed to refresh index:{} reason:{}", sfe.index(), sfe.reason()); + } + } + logger.debug("Refreshed indexes: {},success:{} failed:{} ", StringUtils.join(indexes, ", "), successfulShards, failedShards); - } catch (IndexMissingException e) { - logger.error("Unable to refresh index. Waiting before sleeping.", e); - throw e; - } - subscriber.onNext(true); - subscriber.onCompleted(); - }); + } catch (IndexMissingException e) { + logger.error("Unable to refresh index. Waiting before sleeping.", e); + throw e; + } + subscriber.onNext(true); + subscriber.onCompleted(); + }); - } + } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac6b394c/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java index fee5917..41795ad 100644 --- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java +++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java @@ -284,7 +284,7 @@ public class EntityIndexTest extends BaseIT { @Test public void testDeleteWithAlias() throws IOException { - Id appId = new SimpleId( "application" ); + Id appId = new SimpleId(UUID.randomUUID(), "application" ); ApplicationScope applicationScope = new ApplicationScopeImpl( appId );
