Fixes bugs and cleans up tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/48be894f Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/48be894f Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/48be894f Branch: refs/heads/USERGRID-608 Commit: 48be894fb8af857f33700fcc2717357936d12913 Parents: a767bb6 Author: Todd Nine <tn...@apigee.com> Authored: Thu May 14 19:23:04 2015 -0600 Committer: Todd Nine <tn...@apigee.com> Committed: Thu May 14 19:23:04 2015 -0600 ---------------------------------------------------------------------- .../usergrid/corepersistence/CoreModule.java | 4 + .../corepersistence/CpRelationManager.java | 1 - .../index/IndexProcessorFig.java | 2 +- .../index/IndexServiceRequestBuilderImpl.java | 74 ++--- .../corepersistence/index/ReIndexService.java | 16 +- .../index/ReIndexServiceImpl.java | 48 ++- .../cursor/AbstractCursorSerializer.java | 2 +- .../PerformanceEntityRebuildIndexTest.java | 318 +++++++++---------- 8 files changed, 225 insertions(+), 240 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java index a02bffd..b22a7cb 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java @@ -25,6 +25,8 @@ import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl; import org.apache.usergrid.corepersistence.index.IndexProcessorFig; import org.apache.usergrid.corepersistence.index.IndexService; import org.apache.usergrid.corepersistence.index.IndexServiceImpl; +import org.apache.usergrid.corepersistence.index.ReIndexService; +import org.apache.usergrid.corepersistence.index.ReIndexServiceImpl; import org.apache.usergrid.corepersistence.migration.AppInfoMigrationPlugin; import org.apache.usergrid.corepersistence.migration.CoreMigration; import org.apache.usergrid.corepersistence.migration.CoreMigrationPlugin; @@ -142,6 +144,8 @@ public class CoreModule extends AbstractModule { bind( AsyncEventService.class ).toProvider( AsyncIndexProvider.class ); + bind( ReIndexService.class).to( ReIndexServiceImpl.class ); + install( new GuicyFigModule( IndexProcessorFig.class ) ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index 4993d88..cba1a07 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -661,7 +661,6 @@ public class CpRelationManager implements RelationManager { }while (!found && length <= maxLength); if(logger.isInfoEnabled()){ logger.info(String.format("Consistent Search finished in %s, results=%s, expected=%s...dumping stack",length, results.size(),expectedResults)); - Thread.dumpStack(); } return results; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java index 8e835e2..e4b2329 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java @@ -78,7 +78,7 @@ public interface IndexProcessorFig extends GuicyFig { String getQueueImplementation(); - @Default("10000") + @Default("1000") @Key("elasticsearch.reindex.flush.interval") int getUpdateInterval(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java index 3466674..4017b6e 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java @@ -22,70 +22,57 @@ package org.apache.usergrid.corepersistence.index; import java.util.UUID; - import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import com.google.common.base.Optional; +/** + * Index service request builder + */ public class IndexServiceRequestBuilderImpl implements IndexServiceRequestBuilder { - - /** - * - final Observable<ApplicationScope> applicationScopes = appId.isPresent()? Observable.just( getApplicationScope(appId.get()) ) : allApplicationsObservable.getData(); - - final String newCursor = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() ); - - //create an observable that loads each entity and indexes it, start it running with publish - final ConnectableObservable<EdgeScope> runningReIndex = - allEntityIdsObservable.getEdgesToEntities( applicationScopes, collection, startTimestamp ) - - //for each edge, create our scope and index on it - .doOnNext( edge -> indexService.index( new EntityIdScope( edge.getApplicationScope(), edge.getEdge().getTargetNode() ) ) ).publish(); - - - - //start our sampler and state persistence - //take a sample every sample interval to allow us to resume state with minimal loss - runningReIndex.sample( indexProcessorFig.getReIndexSampleInterval(), TimeUnit.MILLISECONDS, - rxTaskScheduler.getAsyncIOScheduler() ) - .doOnNext( edge -> { - - final String serializedState = SerializableMapper.asString( edge ); - - mapManager.putString( newCursor, serializedState, INDEX_TTL ); - } ).subscribe(); - - - */ - - private Optional<UUID> withApplicationId; - private Optional<String> withCollectionName; - private Optional<String> cursor; - private Optional<Long> updateTimestamp; + private Optional<UUID> withApplicationId = Optional.absent(); + private Optional<String> withCollectionName = Optional.absent(); + private Optional<String> cursor = Optional.absent(); + private Optional<Long> updateTimestamp = Optional.absent(); /*** * - * @param applicationId + * @param applicationId The application id * @return */ @Override public IndexServiceRequestBuilder withApplicationId( final UUID applicationId ) { - this.withApplicationId = Optional.fromNullable(applicationId); + this.withApplicationId = Optional.fromNullable( applicationId ); return this; } + /** + * the colleciton name + * @param collectionName + * @return + */ @Override public IndexServiceRequestBuilder withCollection( final String collectionName ) { - this.withCollectionName = Optional.fromNullable( collectionName ); + if(collectionName == null){ + this.withCollectionName = Optional.absent(); + } + else { + this.withCollectionName = Optional.fromNullable( CpNamingUtils.getEdgeTypeFromCollectionName( collectionName ) ); + } return this; } + /** + * The cursor + * @param cursor + * @return + */ @Override public IndexServiceRequestBuilder withCursor( final String cursor ) { this.cursor = Optional.fromNullable( cursor ); @@ -93,9 +80,14 @@ public class IndexServiceRequestBuilderImpl implements IndexServiceRequestBuilde } + /** + * Set start timestamp in epoch time. Only entities updated since this time will be processed for indexing + * @param timestamp + * @return + */ @Override public IndexServiceRequestBuilder withStartTimestamp( final Long timestamp ) { - this.updateTimestamp = Optional.fromNullable(timestamp ); + this.updateTimestamp = Optional.fromNullable( timestamp ); return this; } @@ -103,8 +95,8 @@ public class IndexServiceRequestBuilderImpl implements IndexServiceRequestBuilde @Override public Optional<ApplicationScope> getApplicationScope() { - if(this.withApplicationId.isPresent()){ - return Optional.of( CpNamingUtils.getApplicationScope( withApplicationId.get())); + if ( this.withApplicationId.isPresent() ) { + return Optional.of( CpNamingUtils.getApplicationScope( withApplicationId.get() ) ); } return Optional.absent(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java index f8955dd..af3615e 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java @@ -31,7 +31,7 @@ public interface ReIndexService { * * @param indexServiceRequestBuilder The builder to build the request */ - IndexResponse rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder ); + ReIndexStatus rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder ); /** @@ -45,20 +45,20 @@ public interface ReIndexService { * @param jobId The jobId returned during the rebuild index * @return */ - IndexResponse getStatus( final String jobId ); + ReIndexStatus getStatus( final String jobId ); /** * The response when requesting a re-index operation */ - class IndexResponse { + class ReIndexStatus { final String jobId; - final String status; + final Status status; final long numberProcessed; final long lastUpdated; - public IndexResponse( final String jobId, final String status, final long numberProcessed, + public ReIndexStatus( final String jobId, final Status status, final long numberProcessed, final long lastUpdated ) { this.jobId = jobId; this.status = status; @@ -97,8 +97,12 @@ public interface ReIndexService { * Get the status * @return */ - public String getStatus() { + public Status getStatus() { return status; } } + + enum Status{ + STARTED, INPROGRESS, COMPLETE; + } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java index d828fc2..f44113b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java @@ -22,6 +22,10 @@ package org.apache.usergrid.corepersistence.index; import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializerUtil; import org.apache.usergrid.corepersistence.pipeline.read.CursorSeek; @@ -51,6 +55,8 @@ import rx.schedulers.Schedulers; @Singleton public class ReIndexServiceImpl implements ReIndexService { + private static final Logger logger = LoggerFactory.getLogger( ReIndexServiceImpl.class ); + private static final MapScope RESUME_MAP_SCOPE = new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "reindexresume" ); @@ -85,7 +91,7 @@ public class ReIndexServiceImpl implements ReIndexService { @Override - public IndexResponse rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder ) { + public ReIndexStatus rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder ) { //load our last emitted Scope if a cursor is present @@ -97,7 +103,7 @@ public class ReIndexServiceImpl implements ReIndexService { final Optional<ApplicationScope> appId = indexServiceRequestBuilder.getApplicationScope(); - Preconditions.checkArgument( cursor.isPresent() && appId.isPresent(), + Preconditions.checkArgument( !(cursor.isPresent() && appId.isPresent()), "You cannot specify an app id and a cursor. When resuming with cursor you must omit the appid" ); final Observable<ApplicationScope> applicationScopes = getApplications( cursor, appId ); @@ -112,9 +118,14 @@ public class ReIndexServiceImpl implements ReIndexService { indexServiceRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() ) //for each edge, create our scope and index on it - .doOnNext( edge -> indexService.index( - new EntityIndexOperation( edge.getApplicationScope(), edge.getEdge().getTargetNode(), - modifiedSince ) ) ); + .doOnNext( edge -> { + final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( edge.getApplicationScope(), edge.getEdge().getTargetNode(), modifiedSince ); + + logger.info( "Queueing {}", entityIndexOperation ); + + indexService.index(entityIndexOperation); + + } ); //start our sampler and state persistence @@ -127,7 +138,7 @@ public class ReIndexServiceImpl implements ReIndexService { .subscribeOn( Schedulers.io() ).subscribe(); - return new IndexResponse( jobId, "Started", 0, 0 ); + return new ReIndexStatus( jobId, Status.STARTED, 0, 0 ); } @@ -138,7 +149,7 @@ public class ReIndexServiceImpl implements ReIndexService { @Override - public IndexResponse getStatus( final String jobId ) { + public ReIndexStatus getStatus( final String jobId ) { Preconditions.checkNotNull( jobId, "jobId must not be null" ); return getIndexResponse( jobId ); } @@ -166,11 +177,11 @@ public class ReIndexServiceImpl implements ReIndexService { writeCursorState( jobId, buffer.get( buffer.size() - 1 ) ); } - writeStateMeta( jobId, "InProgress", count, System.currentTimeMillis() ); + writeStateMeta( jobId, Status.INPROGRESS, count, System.currentTimeMillis() ); } public void complete(){ - writeStateMeta( jobId, "Complete", count, System.currentTimeMillis() ); + writeStateMeta( jobId, Status.COMPLETE, count, System.currentTimeMillis() ); } } @@ -257,10 +268,15 @@ public class ReIndexServiceImpl implements ReIndexService { * @param processedCount * @param lastUpdated */ - private void writeStateMeta( final String jobId, final String status, final long processedCount, + private void writeStateMeta( final String jobId, final Status status, final long processedCount, final long lastUpdated ) { - mapManager.putString( jobId + MAP_STATUS_KEY, status ); + if(logger.isDebugEnabled()) { + logger.debug( "Flushing state for jobId {}, status {}, processedCount {}, lastUpdated {}", + new Object[] { jobId, status, processedCount, lastUpdated } ); + } + + mapManager.putString( jobId + MAP_STATUS_KEY, status.name() ); mapManager.putLong( jobId + MAP_COUNT_KEY, processedCount ); mapManager.putLong( jobId + MAP_UPDATED_KEY, lastUpdated ); } @@ -271,18 +287,20 @@ public class ReIndexServiceImpl implements ReIndexService { * @param jobId * @return */ - private IndexResponse getIndexResponse( final String jobId ) { + private ReIndexStatus getIndexResponse( final String jobId ) { - final String status = mapManager.getString( jobId+MAP_STATUS_KEY ); + final String stringStatus = mapManager.getString( jobId+MAP_STATUS_KEY ); - if(status == null){ + if(stringStatus == null){ throw new IllegalArgumentException( "Could not find a job with id " + jobId ); } + final Status status = Status.valueOf( stringStatus ); + final long processedCount = mapManager.getLong( jobId + MAP_COUNT_KEY ); final long lastUpdated = mapManager.getLong( jobId + MAP_COUNT_KEY ); - return new IndexResponse( jobId, status, processedCount, lastUpdated ); + return new ReIndexStatus( jobId, status, processedCount, lastUpdated ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java index 23bb99a..e770a77 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java @@ -44,7 +44,7 @@ public abstract class AbstractCursorSerializer<T> implements CursorSerializer<T> try { final Class<? extends T> classType = getType(); - return objectMapper.treeToValue( node, classType ); + return objectMapper.treeToValue( node, classType ); } catch ( JsonProcessingException e ) { throw new CursorParseException( "Unable to deserialize value", e ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java index 55c4846..8d54043 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java @@ -22,78 +22,67 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.TimeUnit; -import com.google.common.base.Optional; -import org.apache.commons.lang.RandomStringUtils; - -import org.apache.usergrid.corepersistence.index.IndexServiceRequestBuilder; -import org.apache.usergrid.corepersistence.index.ReIndexService; -import org.apache.usergrid.persistence.index.ApplicationEntityIndex; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang.RandomStringUtils; + import org.apache.usergrid.AbstractCoreIT; import org.apache.usergrid.cassandra.SpringResource; +import org.apache.usergrid.corepersistence.index.IndexServiceRequestBuilder; +import org.apache.usergrid.corepersistence.index.ReIndexService; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; +import org.apache.usergrid.persistence.index.ApplicationEntityIndex; import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.SimpleId; -import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Slf4jReporter; import com.google.inject.Injector; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; //@RunWith(JukitoRunner.class) //@UseModules({ GuiceModule.class }) + public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT { - private static final Logger logger = LoggerFactory.getLogger(PerformanceEntityRebuildIndexTest.class ); + private static final Logger logger = LoggerFactory.getLogger( PerformanceEntityRebuildIndexTest.class ); private static final MetricRegistry registry = new MetricRegistry(); - private Slf4jReporter reporter; - - private static final int ENTITIES_TO_INDEX = 2000; + private static final int ENTITIES_TO_INDEX = 1000; @Before public void startReporting() { - logger.debug("Starting metrics reporting"); - reporter = Slf4jReporter.forRegistry( registry ).outputTo( logger ) - .convertRatesTo( TimeUnit.SECONDS ) - .convertDurationsTo( TimeUnit.MILLISECONDS ).build(); - - reporter.start( 10, TimeUnit.SECONDS ); + logger.debug( "Starting metrics reporting" ); } @After public void printReport() { - logger.debug("Printing metrics report"); - reporter.report(); - reporter.stop(); + logger.debug( "Printing metrics report" ); } - @Test + @Test( timeout = 120000 ) public void rebuildOneCollectionIndex() throws Exception { - logger.info("Started rebuildIndex()"); + logger.info( "Started rebuildIndex()" ); - String rand = RandomStringUtils.randomAlphanumeric(5); - final UUID appId = setup.createApplication("org_" + rand, "app_" + rand); + String rand = RandomStringUtils.randomAlphanumeric( 5 ); + final UUID appId = setup.createApplication( "org_" + rand, "app_" + rand ); final EntityManager em = setup.getEmf().getEntityManager( appId ); @@ -102,109 +91,80 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT { // ----------------- create a bunch of entities Map<String, Object> entityMap = new HashMap<String, Object>() {{ - put("key1", 1000 ); - put("key2", 2000 ); - put("key3", "Some value"); + put( "key1", 1000 ); + put( "key2", 2000 ); + put( "key3", "Some value" ); }}; List<EntityRef> entityRefs = new ArrayList<EntityRef>(); - int herderCount = 0; + int herderCount = 0; int shepardCount = 0; - for (int i = 0; i < ENTITIES_TO_INDEX; i++) { + for ( int i = 0; i < ENTITIES_TO_INDEX; i++ ) { final Entity entity; try { - entityMap.put("key", i ); + entityMap.put( "key", i ); if ( i % 2 == 0 ) { - entity = em.create("catherder", entityMap); + entity = em.create( "catherder", entityMap ); herderCount++; - } else { - entity = em.create("catshepard", entityMap); + } + else { + entity = em.create( "catshepard", entityMap ); shepardCount++; } - - app.refreshIndex(); - -// em.createConnection(entity, "herds", cat1); -// em.createConnection(entity, "herds", cat2); -// em.createConnection(entity, "herds", cat3); - - } catch (Exception ex) { - throw new RuntimeException("Error creating entity", ex); + } + catch ( Exception ex ) { + throw new RuntimeException( "Error creating entity", ex ); } - entityRefs.add(new SimpleEntityRef( entity.getType(), entity.getUuid() ) ); + entityRefs.add( new SimpleEntityRef( entity.getType(), entity.getUuid() ) ); if ( i % 10 == 0 ) { - logger.info("Created {} entities", i ); + logger.info( "Created {} entities", i ); } - } - logger.info("Created {} entities", ENTITIES_TO_INDEX); + logger.info( "Created {} entities", ENTITIES_TO_INDEX ); app.refreshIndex(); // ----------------- test that we can read them, should work fine - logger.debug("Read the data"); - readData( em, "catherders", herderCount, 0); - readData( em, "catshepards", shepardCount, 0); + logger.debug( "Read the data" ); + readData( em, "catherders", herderCount, 0 ); + readData( em, "catshepards", shepardCount, 0 ); // ----------------- delete the system and application indexes - logger.debug("Deleting apps"); + logger.debug( "Deleting apps" ); deleteIndex( em.getApplicationId() ); // ----------------- test that we can read them, should fail - logger.debug("Reading data, should fail this time "); - try { - readData( em, "testTypes", ENTITIES_TO_INDEX, 0 ); - fail("should have failed to read data"); + logger.debug( "Reading data, should fail this time " ); - } catch (Exception expected) {} + //should be no data + readData( em, "testTypes", 0, 0 ); -// ----------------- rebuild index for catherders only - logger.debug("Preparing to rebuild all indexes");; + // ----------------- rebuild index for catherders only - final String meterName = this.getClass().getSimpleName() + ".rebuildIndex"; - final Meter meter = registry.meter( meterName ); + logger.debug( "Preparing to rebuild all indexes" ); - EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() { - int counter = 0; - @Override - public void onProgress( final EntityRef entity ) { - - meter.mark(); - logger.debug("Indexing {}:{}", entity.getType(), entity.getUuid()); - if ( counter % 100 == 0 ) { - logger.info("Reindexed {} entities", counter ); - } - counter++; - } + final IndexServiceRequestBuilder builder = + reIndexService.getBuilder().withApplicationId( em.getApplicationId() ).withCollection( "catherders" ); + ReIndexService.ReIndexStatus status = reIndexService.rebuildIndex( builder ); + assertNotNull( status.getJobId(), "JobId is present" ); - }; - - try { + logger.info( "Rebuilt index" ); - final IndexServiceRequestBuilder builder = reIndexService.getBuilder().withApplicationId( em.getApplicationId() ).withCollection( "catherders" ); - reIndexService.rebuildIndex(builder ); + waitForRebuild( status, reIndexService ); - reporter.report(); - registry.remove( meterName ); - logger.info("Rebuilt index"); - - } catch (Exception ex) { - logger.error("Error rebuilding index", ex); - fail(); - } // ----------------- test that we can read the catherder collection and not the catshepard @@ -213,78 +173,79 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT { } - @Test + @Test( timeout = 120000 ) public void rebuildIndex() throws Exception { - logger.info("Started rebuildIndex()"); + logger.info( "Started rebuildIndex()" ); + + String rand = RandomStringUtils.randomAlphanumeric( 5 ); + final UUID appId = setup.createApplication( "org_" + rand, "app_" + rand ); - String rand = RandomStringUtils.randomAlphanumeric(5); - final UUID appId = setup.createApplication("org_" + rand, "app_" + rand); + final EntityManager em = setup.getEmf().getEntityManager( appId ); - final EntityManager em = setup.getEmf().getEntityManager(appId); + final ReIndexService reIndexService = setup.getInjector().getInstance( ReIndexService.class ); // ----------------- create a bunch of entities Map<String, Object> entityMap = new HashMap<String, Object>() {{ - put("key1", 1000 ); - put("key2", 2000 ); - put("key3", "Some value"); + put( "key1", 1000 ); + put( "key2", 2000 ); + put( "key3", "Some value" ); }}; Map<String, Object> cat1map = new HashMap<String, Object>() {{ - put("name", "enzo"); - put("color", "orange"); + put( "name", "enzo" ); + put( "color", "orange" ); }}; Map<String, Object> cat2map = new HashMap<String, Object>() {{ - put("name", "marquee"); - put("color", "grey"); + put( "name", "marquee" ); + put( "color", "grey" ); }}; Map<String, Object> cat3map = new HashMap<String, Object>() {{ - put("name", "bertha"); - put("color", "tabby"); + put( "name", "bertha" ); + put( "color", "tabby" ); }}; - Entity cat1 = em.create("cat", cat1map ); - Entity cat2 = em.create("cat", cat2map ); - Entity cat3 = em.create("cat", cat3map ); + Entity cat1 = em.create( "cat", cat1map ); + Entity cat2 = em.create( "cat", cat2map ); + Entity cat3 = em.create( "cat", cat3map ); - List<EntityRef> entityRefs = new ArrayList<EntityRef>(); - int entityCount = 0; - for (int i = 0; i < ENTITIES_TO_INDEX; i++) { + List<EntityRef> entityRefs = new ArrayList<>(); + + for ( int i = 0; i < ENTITIES_TO_INDEX; i++ ) { final Entity entity; try { - entityMap.put("key", entityCount ); - entity = em.create("testType", entityMap ); + entityMap.put( "key", i ); + entity = em.create( "testType", entityMap ); - em.createConnection(entity, "herds", cat1); - em.createConnection(entity, "herds", cat2); - em.createConnection(entity, "herds", cat3); - - } catch (Exception ex) { - throw new RuntimeException("Error creating entity", ex); + em.createConnection( entity, "herds", cat1 ); + em.createConnection( entity, "herds", cat2 ); + em.createConnection( entity, "herds", cat3 ); } - - entityRefs.add(new SimpleEntityRef( entity.getType(), entity.getUuid() ) ); - if ( entityCount % 10 == 0 ) { - logger.info("Created {} entities", entityCount ); + catch ( Exception ex ) { + throw new RuntimeException( "Error creating entity", ex ); } + entityRefs.add( new SimpleEntityRef( entity.getType(), entity.getUuid() ) ); + if ( i % 10 == 0 ) { + logger.info( "Created {} entities", i ); + } } - logger.info("Created {} entities", entityCount); + logger.info( "Created {} entities", ENTITIES_TO_INDEX ); app.refreshIndex(); - Thread.sleep(10000); // ----------------- test that we can read them, should work fine - logger.debug("Read the data"); - readData( em, "testType", entityCount, 3 ); + logger.debug( "Read the data" ); + final String collectionName = "testtypes"; + readData( em, collectionName, ENTITIES_TO_INDEX, 3 ); // ----------------- delete the system and application indexes - logger.debug("Deleting app index"); + logger.debug( "Deleting app index" ); deleteIndex( em.getApplicationId() ); @@ -295,61 +256,73 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT { // ----------------- test that we can read them, should fail - logger.debug("Reading data, should fail this time "); - try { - readData( em, "testTypes", entityCount, 3 ); - fail("should have failed to read data"); + logger.debug( "Reading data, should fail this time " ); + + readData( em, collectionName, 0, 0 ); + - } catch (Exception expected) {} // ----------------- rebuild index - logger.debug("Preparing to rebuild all indexes");; + logger.debug( "Preparing to rebuild all indexes" ); + ; - final String meterName = this.getClass().getSimpleName() + ".rebuildIndex"; - final Meter meter = registry.meter( meterName ); - EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() { - int counter = 0; + try { - @Override - public void onProgress( final EntityRef entity ) { + final IndexServiceRequestBuilder builder = + reIndexService.getBuilder().withApplicationId( em.getApplicationId() ); - meter.mark(); - logger.debug("Indexing {}:{}", entity.getType(), entity.getUuid()); - if ( counter % 100 == 0 ) { - logger.info("Reindexed {} entities", counter ); - } - counter++; - } + ReIndexService.ReIndexStatus status = reIndexService.rebuildIndex( builder ); + assertNotNull( status.getJobId(), "JobId is present" ); - }; + logger.info( "Rebuilt index" ); - try { - fail( "Implement index rebuild" ); -// setup.getEmf().rebuildInternalIndexes( po ); -// -// setup.getEmf().rebuildApplicationIndexes( em.getApplicationId(), po ); + waitForRebuild( status, reIndexService ); - reporter.report(); - registry.remove( meterName ); - logger.info("Rebuilt index"); - app.refreshIndex(); + logger.info( "Rebuilt index" ); - } catch (Exception ex) { - logger.error("Error rebuilding index", ex); + app.refreshIndex(); + } + catch ( Exception ex ) { + logger.error( "Error rebuilding index", ex ); fail(); } // ----------------- test that we can read them - Thread.sleep(2000); - readData( em, "testTypes", entityCount, 3 ); + Thread.sleep( 2000 ); + readData( em, collectionName, ENTITIES_TO_INDEX, 3 ); + } + + + /** + * Wait for the rebuild to occur + */ + private void waitForRebuild( final ReIndexService.ReIndexStatus status, final ReIndexService reIndexService ) + throws InterruptedException { + while ( true ) { + + try { + final ReIndexService.ReIndexStatus updatedStatus = reIndexService.getStatus( status.getJobId() ); + + if ( updatedStatus.getStatus() == ReIndexService.Status.COMPLETE ) { + break; + } + } + catch ( IllegalArgumentException iae ) { + //swallow + } + + + Thread.sleep( 1000 ); + } } + /** * Delete app index */ @@ -360,54 +333,49 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT { Id appId = new SimpleId( appUuid, Schema.TYPE_APPLICATION ); ApplicationScope scope = new ApplicationScopeImpl( appId ); - ApplicationEntityIndex ei = eif.createApplicationEntityIndex(scope); + ApplicationEntityIndex ei = eif.createApplicationEntityIndex( scope ); - ei.deleteApplication().toBlocking().lastOrDefault(null); + ei.deleteApplication().toBlocking().lastOrDefault( null ); app.refreshIndex(); - } - private int readData( EntityManager em, - String collectionName, int expectedEntities, int expectedConnections ) throws Exception { + private int readData( EntityManager em, String collectionName, int expectedEntities, int expectedConnections ) + throws Exception { app.refreshIndex(); - Query q = Query.fromQL("select * where key1=1000"); - q.setLimit(40); - Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q,expectedEntities ); + Query q = Query.fromQL( "select * where key1=1000" ).withLimit( 1000 ); + Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, expectedEntities ); int count = 0; while ( true ) { for ( Entity e : results.getEntities() ) { - assertEquals( 2000, e.getProperty("key2")); + assertEquals( 2000, e.getProperty( "key2" ) ); - Results catResults = em.searchTargetEntities(e, - Query.fromQL("select *").setConnectionType("herds")); + Results catResults = + em.searchTargetEntities( e, Query.fromQL( "select *" ).setConnectionType( "herds" ) ); assertEquals( expectedConnections, catResults.size() ); if ( count % 100 == 0 ) { - logger.info( "read {} entities", count); + logger.info( "read {} entities", count ); } count++; } if ( results.hasCursor() ) { - logger.info( "Counted {} : query again with cursor", count); + logger.info( "Counted {} : query again with cursor", count ); q.setCursor( results.getCursor() ); results = em.searchCollection( em.getApplicationRef(), collectionName, q ); - - } else { + } + else { break; } } - if ( expectedEntities != -1 && expectedEntities != count ) { - throw new RuntimeException("Did not get expected " - + expectedEntities + " entities, instead got " + count ); - } + assertEquals("Did not get expected entities", expectedEntities, count); return count; } }