trying to fix inconsistency
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/0cc225f9 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/0cc225f9 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/0cc225f9 Branch: refs/heads/master Commit: 0cc225f94c4239d93df23ff76df1c8b179594277 Parents: 5b5fbf0 Author: Shawn Feldman <[email protected]> Authored: Thu Oct 15 18:28:26 2015 -0600 Committer: Shawn Feldman <[email protected]> Committed: Thu Oct 15 18:28:26 2015 -0600 ---------------------------------------------------------------------- .../usergrid/corepersistence/CpEntityManager.java | 7 +++++-- .../corepersistence/index/IndexProcessorFig.java | 2 +- .../corepersistence/StaleIndexCleanupTest.java | 16 +++++++++++----- .../corepersistence/index/PublishRxTest.java | 2 +- .../query/IntersectionTransitivePagingIT.java | 1 + .../persistence/query/IteratingQueryIT.java | 4 ++-- .../persistence/queue/DefaultQueueManager.java | 4 ++-- 7 files changed, 23 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/0cc225f9/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 82bb6bb..addd6ef 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -2898,14 +2898,17 @@ public class CpEntityManager implements EntityManager { hasFinished = true; break; } - Thread.sleep(250); + Thread.sleep(200); + indexRefreshCommandInfo + = managerCache.getEntityIndex(applicationScope).refreshAsync().toBlocking().first(); } if(!hasFinished){ - logger.warn("Did not find entity {} during refresh.",refreshEntity.getUuid()); + throw new RuntimeException("Did not find entity {} during refresh. uuid->"+refreshEntity.getUuid()); } }finally { delete(refreshEntity); } + return indexRefreshCommandInfo; } catch (Exception e) { throw new RuntimeException("refresh failed",e); http://git-wip-us.apache.org/repos/asf/usergrid/blob/0cc225f9/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java index 410f162..ec6024f 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java @@ -73,7 +73,7 @@ public interface IndexProcessorFig extends GuicyFig { /** * The number of worker threads used to read index write requests from the queue. */ - @Default( "8" ) + @Default( "16" ) @Key( ELASTICSEARCH_WORKER_COUNT ) int getWorkerCount(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/0cc225f9/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java index df93e68..16d9a5b 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java @@ -375,10 +375,6 @@ public class StaleIndexCleanupTest extends AbstractCoreIT { // turn ON post processing stuff that cleans up stale entities System.setProperty(EVENTS_DISABLED, "false"); - // delete all entities - for ( Entity thing : things ) { - em.delete( thing ); - } Thread.sleep(250); // delete happens asynchronously, wait for some time @@ -396,9 +392,19 @@ public class StaleIndexCleanupTest extends AbstractCoreIT { do { //trigger the repair results = queryCollectionEm("things", "select *"); + results.getEntities().stream().forEach(entity -> { + try { + em.delete(entity); + }catch (Exception e){ + // + } + }); + //refresh the app index + app.refreshIndex(); + crs = queryCollectionCp("things", "thing", "select *"); - } while ((results.hasCursor() || crs.size() > 0) && count++ < 2000 ); + } while ( crs.size() > 0 && count++ < 2000 ); Assert.assertEquals( "Expect no candidates", 0, crs.size() ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/0cc225f9/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/PublishRxTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/PublishRxTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/PublishRxTest.java index 973a42d..c0ed57a 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/PublishRxTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/PublishRxTest.java @@ -52,7 +52,7 @@ public class PublishRxTest { .subscribe(); - final boolean completed = latch.await( 5, TimeUnit.SECONDS ); + final boolean completed = latch.await( 3, TimeUnit.SECONDS ); assertTrue( "publish1 behaves as expected", completed ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/0cc225f9/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionTransitivePagingIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionTransitivePagingIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionTransitivePagingIT.java index 4490a22..aee643b 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionTransitivePagingIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionTransitivePagingIT.java @@ -133,6 +133,7 @@ public class IntersectionTransitivePagingIT{ } this.app.refreshIndex(); + Thread.sleep(1000); return expected; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/0cc225f9/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java index d882f78..e589be4 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java @@ -299,6 +299,7 @@ public class IteratingQueryIT { } app.refreshIndex(); + Thread.sleep(1000); long stop = System.currentTimeMillis(); LOG.info( "Writes took {} ms", stop - start ); @@ -387,8 +388,7 @@ public class IteratingQueryIT { results = io.getResults( query ); for ( int i = 0 ; i< results.size(); i++) { - assertEquals( expected.get( count ), results.getEntities().get( i ).getName() ); - count++; + assertEquals( expected.get( count++ ), results.getEntities().get( i ).getName() ); } query.setCursor( results.getCursor() ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/0cc225f9/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java index d9a141e..0ef2849 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java @@ -38,9 +38,9 @@ public class DefaultQueueManager implements QueueManager { public LinkedBlockingQueue<QueueMessage> queue = new LinkedBlockingQueue<>(); @Override - public synchronized Observable<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) { + public Observable<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) { List<QueueMessage> returnQueue = new ArrayList<>(); - queue.drainTo(returnQueue,10); + queue.drainTo(returnQueue,1000); return Observable.from( returnQueue); }
