removed search from refresh need to use queue
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f8614a68 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f8614a68 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f8614a68 Branch: refs/heads/master Commit: f8614a68f112469713874f032e40ab9fcc9b24d8 Parents: 2d23aa6 Author: Shawn Feldman <[email protected]> Authored: Fri Oct 9 15:28:33 2015 -0600 Committer: Shawn Feldman <[email protected]> Committed: Fri Oct 9 15:28:33 2015 -0600 ---------------------------------------------------------------------- .../corepersistence/CpEntityManager.java | 43 ++++-- .../asyncevents/EventBuilderImpl.java | 10 +- .../org/apache/usergrid/CoreApplication.java | 14 +- .../corepersistence/StaleIndexCleanupTest.java | 8 +- .../persistence/ApplicationServiceIT.java | 9 +- .../index/impl/IndexRefreshCommandImpl.java | 134 +++---------------- .../persistence/queue/DefaultQueueManager.java | 2 +- 7 files changed, 76 insertions(+), 144 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8614a68/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index d46c112..643a2b8 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -41,6 +41,11 @@ import org.apache.usergrid.corepersistence.service.ConnectionService; import org.apache.usergrid.persistence.index.EntityIndex; import org.apache.usergrid.persistence.index.IndexLocationStrategy; import org.apache.usergrid.persistence.index.IndexRefreshCommand; +import org.apache.usergrid.persistence.index.utils.*; +import org.apache.usergrid.utils.*; +import org.apache.usergrid.utils.ClassUtils; +import org.apache.usergrid.utils.ConversionUtils; +import org.apache.usergrid.utils.UUIDUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; @@ -102,11 +107,6 @@ import org.apache.usergrid.persistence.model.entity.SimpleId; import org.apache.usergrid.persistence.model.field.Field; import org.apache.usergrid.persistence.model.field.StringField; import org.apache.usergrid.persistence.model.util.UUIDGenerator; -import org.apache.usergrid.utils.ClassUtils; -import org.apache.usergrid.utils.CompositeUtils; -import org.apache.usergrid.utils.Inflector; -import org.apache.usergrid.utils.StringUtils; -import org.apache.usergrid.utils.UUIDUtils; import com.codahale.metrics.Meter; import com.codahale.metrics.Timer; @@ -2874,13 +2874,36 @@ public class CpEntityManager implements EntityManager { * TODO, these 3 methods are super janky. During refactoring we should clean this model up */ public IndexRefreshCommand.IndexRefreshCommandInfo refreshIndex() { + try { + long start = System.currentTimeMillis(); + // refresh special indexes without calling EntityManager refresh because stack overflow + Map<String, Object> map = new org.apache.usergrid.persistence.index.utils.MapUtils.HashMapBuilder<>(); + map.put("some prop", "test"); + boolean hasFinished = false; + Entity refreshEntity = create("refresh", map); + try { + for (int i = 0; i < 10; i++) { + if (searchCollection( + new SimpleEntityRef(org.apache.usergrid.persistence.entities.Application.ENTITY_TYPE, getApplicationId()), + InflectionUtils.pluralize("refresh"), + Query.fromQL("select * where uuid='" + refreshEntity.getUuid() + "'") + ).size() > 0 + ) { + hasFinished = true; + break; + } + Thread.sleep(250); + return managerCache.getEntityIndex(applicationScope).refreshAsync().toBlocking().first(); + } + }finally { + delete(refreshEntity); + } + return new IndexRefreshCommand.IndexRefreshCommandInfo(hasFinished,System.currentTimeMillis() - start); + } catch (Exception e) { + throw new RuntimeException("refresh failed",e); + } - // refresh special indexes without calling EntityManager refresh because stack overflow - - return managerCache.getEntityIndex(applicationScope).refreshAsync().toBlocking().first(); } - - } http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8614a68/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java index 4bf5695..bc72207 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java @@ -108,10 +108,12 @@ public class EventBuilderImpl implements EventBuilder { log.debug( "Deleting in app scope {} with edge {} }", applicationScope, edge ); final Observable<IndexOperationMessage> edgeObservable = - indexService.deleteIndexEdge( applicationScope, edge ).flatMap( batch -> { - final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope ); - return gm.deleteEdge( edge ).map( deletedEdge -> batch ); - } ); + indexService.deleteIndexEdge( applicationScope, edge ) + .map( batch -> { + final GraphManager gm = graphManagerFactory.createEdgeManager(applicationScope); + gm.deleteEdge(edge).toBlocking().lastOrDefault(null); + return batch; + } ); return edgeObservable; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8614a68/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java b/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java index 9c96fb8..d207894 100644 --- a/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java +++ b/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java @@ -17,10 +17,7 @@ package org.apache.usergrid; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ExecutionException; import com.google.inject.Injector; @@ -28,8 +25,10 @@ import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory; import org.apache.usergrid.corepersistence.service.ApplicationService; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.persistence.index.*; +import org.apache.usergrid.persistence.index.utils.MapUtils; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.SimpleId; +import org.apache.usergrid.utils.InflectionUtils; import org.junit.rules.TestRule; import org.junit.runner.Description; import org.junit.runners.model.Statement; @@ -235,14 +234,11 @@ public class CoreApplication implements Application, TestRule { //Insert test entity and find it setup.getEmf().refreshIndex(CpNamingUtils.getManagementApplicationId().getUuid()); - if(!em.getApplicationId().equals(CpNamingUtils.getManagementApplicationId().getUuid())) { + if (!em.getApplicationId().equals(CpNamingUtils.getManagementApplicationId().getUuid())) { setup.getEmf().refreshIndex(em.getApplicationId()); } - try { - Thread.sleep(2000); - }catch (Exception e){ - } + em.refreshIndex(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8614a68/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java index 3e46e4f..df93e68 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java @@ -321,8 +321,8 @@ public class StaleIndexCleanupTest extends AbstractCoreIT { final EntityManager em = app.getEntityManager(); - final int numEntities = 20; - final int numUpdates = 40; + final int numEntities = 5; + final int numUpdates = 5; // create lots of entities final List<Entity> things = new ArrayList<Entity>(numEntities); @@ -348,7 +348,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT { for ( int j=0; j<numUpdates; j++) { toUpdate = em.get( thing.getUuid() ); - toUpdate.setProperty( "property" + j, RandomStringUtils.randomAlphanumeric(10)); + toUpdate.setProperty( "property" + j, UUID.randomUUID().toString()); em.update(toUpdate); @@ -367,9 +367,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT { if(numEntities * (numUpdates + 1) == crs.size()){ break; } - Thread.sleep(250); crs = queryCollectionCp("things", "thing", "select *"); - } // Assert.assertEquals("Expect stale candidates", numEntities * (numUpdates + 1), crs.size()); http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8614a68/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java index d870114..f8079e5 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java @@ -76,7 +76,6 @@ public class ApplicationServiceIT extends AbstractCoreIT { count = ids.count().toBlocking().last(); Assert.assertEquals(count, 5); this.app.refreshIndex(); - Thread.sleep(5000); Injector injector = SpringResource.getInstance().getBean(Injector.class); GraphManagerFactory factory = injector.getInstance(GraphManagerFactory.class); GraphManager graphManager = factory.createEdgeManager(appScope); @@ -88,7 +87,13 @@ public class ApplicationServiceIT extends AbstractCoreIT { Iterator<Edge> results = graphManager.loadEdgesFromSource(simpleSearchByEdgeType).toBlocking().getIterator(); if(results.hasNext()){ - Assert.fail("should be empty"); + int i = 0; + + while(results.hasNext()){ + results.next(); + i++; + } + Assert.fail("should be empty but has "+i); }else{ Results searchCollection = entityManager.searchCollection(entityManager.getApplication(), "tests", Query.all()); http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8614a68/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java index 087eefe..6b8b024 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java @@ -85,118 +85,26 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand { final long start = System.currentTimeMillis(); - - //id to hunt for - final UUID uuid = UUIDUtils.newTimeUUID(); - final Entity entity = new Entity( new SimpleId( uuid, "ug_refresh_index_type" ) ); - EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() ); - final Id appId = new SimpleId( "ug_refresh_index" ); - final ApplicationScope appScope = new ApplicationScopeImpl( appId ); - final IndexEdge edge = new IndexEdgeImpl( appId, "refresh", SearchEdge.NodeType.SOURCE, uuid.timestamp() ); - final String docId = IndexingUtils.createIndexDocId( appScope, entity, edge ); - final Map<String, Object> entityData = EntityToMapConverter.convert( appScope, edge, entity ); - final String entityId = entityData.get( IndexingUtils.ENTITY_ID_FIELDNAME ).toString(); - //add a tracer record - IndexOperation indexRequest = new IndexOperation( alias.getWriteAlias(), docId, entityData ); - //save the item - final IndexOperationMessage message = new IndexOperationMessage(); - message.addIndexRequest( indexRequest ); - - //add the record to the index - final Observable<IndexOperationMessage> addRecord = producer.put( message ); - - //refresh the index - // final Observable<Boolean> refresh = refresh( indexes ); - - /** - * We have to search. Get by ID returns immediately, even if search isn't ready, therefore we have to search - */ - //set our filter for entityId fieldname - - - /** - * We want to search once we've added our record, then refreshed - */ - final Observable<IndexRefreshCommandInfo> searchObservable = - Observable.create(sub -> { - try { - IndexRefreshCommandInfo info = null; - for(int i = 0; i<indexFig.maxRefreshSearches();i++) { - final SearchRequestBuilder builder = esProvider.getClient().prepareSearch(alias.getReadAlias()) - .setTypes(IndexingUtils.ES_ENTITY_TYPE) - .setPostFilter(FilterBuilders - .termFilter(IndexingUtils.ENTITY_ID_FIELDNAME, - entityId)); - - info = new IndexRefreshCommandInfo(builder.execute().get().getHits().totalHits() > 0, - System.currentTimeMillis() - start); - if(info.hasFinished()){ - break; - }else { - Thread.sleep(50); - } - } - sub.onNext(info); - sub.onCompleted(); - } catch (Exception ee) { - logger.error("Failed during refresh search for " + uuid, ee); - throw new RuntimeException("Failed during refresh search for " + uuid, ee); - } - }); - - - //chain it all together - - //add the record, take it's last result. On the last add, we then execute the refresh command - - final Observable<IndexRefreshCommandInfo> refreshResults = addRecord - - //after our add, run a refresh - .doOnNext( addResult -> { - - - if ( indexes.length == 0 ) { - logger.debug( "Not refreshing indexes. none found" ); - } - //Added For Graphite Metrics - RefreshResponse response = - esProvider.getClient().admin().indices().prepareRefresh( indexes ).execute().actionGet(); - int failedShards = response.getFailedShards(); - int successfulShards = response.getSuccessfulShards(); - ShardOperationFailedException[] sfes = response.getShardFailures(); - if ( sfes != null ) { - for ( ShardOperationFailedException sfe : sfes ) { - logger.error( "Failed to refresh index:{} reason:{}", sfe.index(), sfe.reason() ); - } - } - logger.debug( "Refreshed indexes: {},success:{} failed:{} ", StringUtils.join( indexes, ", " ), - successfulShards, failedShards); - }) - - //once the refresh is done execute the search - .flatMap(refreshCommandResult -> searchObservable) - - //check when found - .doOnNext(found -> { - if (!found.hasFinished()) { - logger.error("Couldn't find record during refresh uuid: {} took ms:{} ", uuid, - found.getExecutionTime()); - } else { - logger.info("found record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime()); - } - }).doOnCompleted(() -> { - //clean up our data - String[] aliases = indexCache.getIndexes(alias, EntityIndex.AliasType.Read); - DeIndexOperation deIndexRequest = - new DeIndexOperation(aliases, appScope, edge, entity.getId(), entity.getVersion()); - - //delete the item - IndexOperationMessage indexOperationMessage = new IndexOperationMessage(); - indexOperationMessage.addDeIndexRequest( deIndexRequest ); - producer.put( indexOperationMessage ).subscribe(); - } ); - - - return ObservableTimer.time( refreshResults, timer ) ; + if (indexes.length == 0) { + logger.debug("Not refreshing indexes. none found"); + } + //Added For Graphite Metrics + RefreshResponse response = + esProvider.getClient().admin().indices().prepareRefresh(indexes).execute().actionGet(); + int failedShards = response.getFailedShards(); + int successfulShards = response.getSuccessfulShards(); + ShardOperationFailedException[] sfes = response.getShardFailures(); + if (sfes != null) { + for (ShardOperationFailedException sfe : sfes) { + logger.error("Failed to refresh index:{} reason:{}", sfe.index(), sfe.reason()); + } + } + logger.debug("Refreshed indexes: {},success:{} failed:{} ", StringUtils.join(indexes, ", "), + successfulShards, failedShards); + + IndexRefreshCommandInfo refreshResults = new IndexRefreshCommandInfo(failedShards == 0, + System.currentTimeMillis() - start); + + return ObservableTimer.time(Observable.just(refreshResults), timer); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8614a68/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java index f36d3c1..edd3b6b 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java @@ -37,7 +37,7 @@ public class DefaultQueueManager implements QueueManager { @Override public synchronized Observable<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) { List<QueueMessage> returnQueue = new ArrayList<>(); - queue.drainTo(returnQueue); + queue.drainTo(returnQueue,1); return Observable.from( returnQueue); }
