add delete back
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cf80b8bb Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cf80b8bb Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cf80b8bb Branch: refs/heads/USERGRID-347 Commit: cf80b8bb1196d0fedb0d3b7e7b758a4b17113794 Parents: f95a756 Author: Shawn Feldman <[email protected]> Authored: Fri Mar 20 14:55:27 2015 -0600 Committer: Shawn Feldman <[email protected]> Committed: Fri Mar 20 14:55:27 2015 -0600 ---------------------------------------------------------------------- .../corepersistence/StaleIndexCleanupTest.java | 67 ++++++---------- .../usergrid/persistence/CollectionIT.java | 10 +-- .../PerformanceEntityRebuildIndexTest.java | 71 ++++++++--------- .../index/ApplicationEntityIndex.java | 8 ++ .../impl/EsApplicationEntityIndexImpl.java | 80 ++++++++++++++++++-- .../index/impl/EsEntityIndexBatchImpl.java | 7 +- .../index/impl/EsEntityIndexImpl.java | 12 ++- 7 files changed, 159 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf80b8bb/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java index 6ba1353..e27420b 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java @@ -94,40 +94,35 @@ public class StaleIndexCleanupTest extends AbstractCoreIT { public void testUpdateVersioning() throws Exception { // turn off post processing stuff that cleans up stale entities - System.setProperty( EVENTS_DISABLED, "true" ); + System.setProperty(EVENTS_DISABLED, "true"); final EntityManager em = app.getEntityManager(); - Entity thing = em.create( "thing", new HashMap<String, Object>() {{ - put( "name", "thing1" ); - }} ); + Entity thing = em.create("thing", new HashMap<String, Object>() {{ + put("name", "thing1"); + }}); app.refreshIndex(); - assertEquals( 1, queryCollectionCp( "things", "thing", "select *" ).size() ); + assertEquals(1, queryCollectionCp("things", "thing", "select *").size()); - org.apache.usergrid.persistence.model.entity.Entity cpEntity = getCpEntity( thing ); + org.apache.usergrid.persistence.model.entity.Entity cpEntity = getCpEntity(thing); UUID oldVersion = cpEntity.getVersion(); - em.updateProperties( thing, new HashMap<String, Object>() {{ - put( "stuff", "widget" ); - }} ); + em.updateProperties(thing, new HashMap<String, Object>() {{ + put("stuff", "widget"); + }}); app.refreshIndex(); - org.apache.usergrid.persistence.model.entity.Entity cpUpdated = getCpEntity( thing ); - assertEquals( "widget", cpUpdated.getField( "stuff" ).getValue() ); + org.apache.usergrid.persistence.model.entity.Entity cpUpdated = getCpEntity(thing); + assertEquals("widget", cpUpdated.getField("stuff").getValue()); UUID newVersion = cpUpdated.getVersion(); - assertTrue( "New version is greater than old", - UUIDComparator.staticCompare( newVersion, oldVersion ) > 0 ); + assertTrue("New version is greater than old", + UUIDComparator.staticCompare(newVersion, oldVersion) > 0); CandidateResults results; - do{ - results = queryCollectionCp( "things", "thing", "select *" ); - if(results.size()!=2){ - Thread.sleep(200); - } - }while(results.size()!=2); - assertEquals( 2, results.size() ); + results = queryCollectionCp("things", "thing", "select *"); + assertEquals(2, results.size()); } @@ -160,37 +155,28 @@ public class StaleIndexCleanupTest extends AbstractCoreIT { CandidateResults candidateResults = null; - do{ - candidateResults = queryCollectionCp("things", "thing", "select * order by ordinal desc"); - if(candidateResults.size()!=2){ - Thread.sleep(200); - } - }while(candidateResults.size()<2); + + candidateResults = queryCollectionCp("things", "thing", "select * order by ordinal desc"); + if(candidateResults.size()!=2){ + Thread.sleep(200); + } + assertEquals(2, candidateResults.size()); //now run enable events and ensure we clean up System.setProperty(EVENTS_DISABLED, "false"); - Results results = null; - do{ - results = queryCollectionEm("things", "select * order by ordinal desc");; - if(results.size()!=1){ - Thread.sleep(200); - } - }while(results.size()<1); + Results results = queryCollectionEm("things", "select * order by ordinal desc"); + assertEquals( 1, results.size() ); assertEquals(1, results.getEntities().get( 0 ).getProperty( "ordinal" )); app.refreshIndex(); //ensure it's actually gone - do{ - candidateResults = queryCollectionCp( "things", "thing", "select * order by ordinal desc" ); - if(candidateResults.size()!=1){ - Thread.sleep(200); - } - }while(candidateResults.size()!=1); + candidateResults = queryCollectionCp( "things", "thing", "select * order by ordinal desc" ); + assertEquals(1, candidateResults.size()); assertEquals(newVersion, candidateResults.get(0).getVersion()); @@ -397,7 +383,6 @@ public class StaleIndexCleanupTest extends AbstractCoreIT { //trigger the repair results = queryCollectionEm("things", "select *"); crs = queryCollectionCp("things", "thing", "select *"); - Thread.sleep(100); } while ((results.hasCursor() || crs.size() > 0) && count++ < 2000 ); @@ -462,8 +447,6 @@ public class StaleIndexCleanupTest extends AbstractCoreIT { // wait for indexes to be cleared for the deleted entities count = 0; do { - queryCollectionEm("dogs", "select *"); - Thread.sleep(100); crs = queryCollectionCp("dogs", "dog", "select *"); } while ( crs.size() != numEntities && count++ < 15 ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf80b8bb/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java index b1a75d9..3e1db1b 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java @@ -1231,13 +1231,9 @@ public class CollectionIT extends AbstractCoreIT { query.addEqualityFilter( "rootprop1", "simpleprop" ); Entity entity; Results results; - do { - results = em.searchCollection(em.getApplicationRef(), "tests", query); - entity = results.getEntitiesMap().get(saved.getUuid()); - if (entity == null) { - Thread.sleep(200); - } - }while(entity == null); + results = em.searchCollection(em.getApplicationRef(), "tests", query); + entity = results.getEntitiesMap().get(saved.getUuid()); + assertNotNull( entity ); // query on the nested int value http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf80b8bb/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java index af31b51..4b284df 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java @@ -175,19 +175,19 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT { // ----------------- delete the system and application indexes logger.debug("Deleting app index index"); -// //deleteIndex( CpNamingUtils.SYSTEM_APP_ID ); -// deleteIndex( em.getApplicationId() ); -// -// // ----------------- test that we can read them, should fail -// -// logger.debug("Reading data, should fail this time "); -// try { -// readData( em, "testTypes", entityCount, 0 ); -// fail("should have failed to read data"); -// -// } catch (Exception expected) {} + //deleteIndex( CpNamingUtils.SYSTEM_APP_ID ); + deleteIndex( em.getApplicationId() ); + + // ----------------- test that we can read them, should fail - // ----------------- rebuild index for catherders only + logger.debug("Reading data, should fail this time "); + try { + readData( em, "testTypes", entityCount, 0 ); + fail("should have failed to read data"); + + } catch (Exception expected) {} + +// ----------------- rebuild index for catherders only logger.debug("Preparing to rebuild all indexes");; @@ -313,20 +313,20 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT { logger.debug("Deleting app index and system app index"); -// deleteIndex( em.getApplicationId() ); + deleteIndex( em.getApplicationId() ); // // // deleting sytem app index will interfere with other concurrently running tests -// //deleteIndex( CpNamingUtils.SYSTEM_APP_ID ); + //deleteIndex( CpNamingUtils.SYSTEM_APP_ID ); // // // // ----------------- test that we can read them, should fail // -// logger.debug("Reading data, should fail this time "); -// try { -// readData( em, "testTypes", entityCount, 3 ); -// fail("should have failed to read data"); -// -// } catch (Exception expected) {} + logger.debug("Reading data, should fail this time "); + try { + readData( em, "testTypes", entityCount, 3 ); + fail("should have failed to read data"); + + } catch (Exception expected) {} // ----------------- rebuild index @@ -371,23 +371,26 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT { // ----------------- test that we can read them + Thread.sleep(2000); readData( em, "testTypes", entityCount, 3 ); } -// /** -// * Delete index for all applications, just need the one to get started. -// */ -// private void deleteIndex( UUID appUuid ) { -// -// Injector injector = SpringResource.getInstance().getBean( Injector.class ); -// EntityIndexFactory eif = injector.getInstance( EntityIndexFactory.class ); -// -// Id appId = new SimpleId( appUuid, "application"); -// ApplicationScope scope = new ApplicationScopeImpl( appId ); -// ApplicationEntityIndex ei = eif.createApplicationEntityIndex(scope); -// EsEntityIndexImpl eeii = (EsEntityIndexImpl)ei; -// -// } + /** + * Delete index for all applications, just need the one to get started. + */ + private void deleteIndex( UUID appUuid ) { + + Injector injector = SpringResource.getInstance().getBean( Injector.class ); + EntityIndexFactory eif = injector.getInstance( EntityIndexFactory.class ); + + Id appId = new SimpleId( appUuid, "application"); + ApplicationScope scope = new ApplicationScopeImpl( appId ); + ApplicationEntityIndex ei = eif.createApplicationEntityIndex(scope); + + ei.deleteApplication().toBlocking().lastOrDefault(null); + app.refreshIndex(); + + } private int readData( EntityManager em, http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf80b8bb/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java index fab32b3..34967bd 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java @@ -19,8 +19,10 @@ */ package org.apache.usergrid.persistence.index; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.usergrid.persistence.index.query.CandidateResults; import org.apache.usergrid.persistence.index.query.Query; +import org.elasticsearch.action.ListenableActionFuture; import rx.Observable; /** @@ -39,4 +41,10 @@ public interface ApplicationEntityIndex { * Execute query in Usergrid syntax. */ public CandidateResults search(final IndexScope indexScope, final SearchTypes searchType, Query query ); + + /** + * delete all application records + * @return + */ + public Observable deleteApplication(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf80b8bb/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java index c83fe41..3633c5b 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java @@ -22,6 +22,8 @@ package org.apache.usergrid.persistence.index.impl; import com.codahale.metrics.Meter; import com.codahale.metrics.Timer; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; import org.apache.commons.lang3.ArrayUtils; @@ -62,9 +64,7 @@ import rx.Observable; import rx.functions.Action1; import rx.schedulers.Schedulers; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -89,6 +89,8 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex{ private final IndexFig indexFig; private final EsProvider esProvider; private final IndexIdentifier.IndexAlias alias; + private final Timer deleteApplicationTimer; + private final Meter deleteApplicationMeter; private FailureMonitor failureMonitor; private final int cursorTimeout; @Inject @@ -109,11 +111,16 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex{ mapManager = mapManagerFactory.createMapManager(mapScope); this.searchTimer = metricsFactory - .getTimer(EsEntityIndexImpl.class, "search.timer"); + .getTimer(EsApplicationEntityIndexImpl.class, "search.timer"); this.cursorTimer = metricsFactory - .getTimer(EsEntityIndexImpl.class, "search.cursor.timer"); + .getTimer(EsApplicationEntityIndexImpl.class, "search.cursor.timer"); this.cursorTimeout = config.getQueryCursorTimeout(); + this.deleteApplicationTimer = metricsFactory + .getTimer(EsApplicationEntityIndexImpl.class, "delete.application"); + this.deleteApplicationMeter = metricsFactory + .getMeter(EsApplicationEntityIndexImpl.class, "delete.application.meter"); + this.alias = indexIdentifier.getAlias(); } @@ -256,6 +263,69 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex{ return parseResults(searchResponse, query); } + /** + * Completely delete an index. + */ + public Observable deleteApplication() { + deleteApplicationMeter.mark(); + String idString = IndexingUtils.idString(applicationScope.getApplication()); + final TermQueryBuilder tqb = QueryBuilders.termQuery(APPLICATION_ID_FIELDNAME, idString); + Set<String> indexSet = new HashSet<>(); + List<String> reads = Arrays.asList(entityIndex.getIndexes(AliasedEntityIndex.AliasType.Read)); + List<String> writes = Arrays.asList(entityIndex.getIndexes(AliasedEntityIndex.AliasType.Write)); + indexSet.addAll(reads); + indexSet.addAll(writes); + String[] indexes = indexSet.toArray(new String[0]); + Timer.Context timer = deleteApplicationTimer.time(); + //Added For Graphite Metrics + return Observable.from(indexes) + .flatMap(index -> { + + final ListenableActionFuture<DeleteByQueryResponse> response = esProvider.getClient() + .prepareDeleteByQuery(alias.getWriteAlias()).setQuery(tqb).execute(); + + response.addListener(new ActionListener<DeleteByQueryResponse>() { + + @Override + public void onResponse(DeleteByQueryResponse response) { + checkDeleteByQueryResponse(tqb, response); + } + + @Override + public void onFailure(Throwable e) { + logger.error("failed on delete index", e); + } + }); + return Observable.from(response); + }) + .doOnCompleted(() -> timer.stop()); + } + + /** + * Validate the response doesn't contain errors, if it does, fail fast at the first error we encounter + */ + private void checkDeleteByQueryResponse( + final QueryBuilder query, final DeleteByQueryResponse response ) { + + for ( IndexDeleteByQueryResponse indexDeleteByQueryResponse : response ) { + final ShardOperationFailedException[] failures = indexDeleteByQueryResponse.getFailures(); + + for ( ShardOperationFailedException failedException : failures ) { + logger.error( String.format("Unable to delete by query %s. " + + "Failed with code %d and reason %s on shard %s in index %s", + query.toString(), + failedException.status().getStatus(), + failedException.reason(), + failedException.shardId(), + failedException.index() ) + ); + } + + } + } + + + private CandidateResults parseResults( final SearchResponse searchResponse, final Query query ) { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf80b8bb/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java index 38bf381..7b0c3b5 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java @@ -95,9 +95,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { ValidationUtils.verifyEntityWrite( entity ); ValidationUtils.verifyVersion( entity.getVersion() ); //add app id for indexing - entity.setField( - new StringField(APPLICATION_ID_FIELDNAME, IndexingUtils.idString(applicationScope.getApplication())) - ); + final String context = createContextName(applicationScope,indexScope); if ( log.isDebugEnabled() ) { @@ -110,7 +108,8 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { ValidationUtils.verifyEntityWrite( entity ); Map<String, Object> entityAsMap = entityToMap( entity, context ); - + //add app id + entityAsMap.put(APPLICATION_ID_FIELDNAME, idString(applicationScope.getApplication())); // need prefix here because we index UUIDs as strings // let caller add these fields if needed http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf80b8bb/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java index 8bdd663..7029bba 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java @@ -64,6 +64,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.*; /** @@ -338,10 +339,13 @@ public class EsEntityIndexImpl implements AliasedEntityIndex { @Override public boolean doOp() { try { - String[] indexes = ArrayUtils.addAll( - getIndexes(AliasType.Read), - getIndexes(AliasType.Write) - ); + + Set<String> indexSet = new HashSet<>(); + List<String> reads = Arrays.asList(getIndexes(AliasType.Read)); + List<String> writes = Arrays.asList(getIndexes(AliasType.Write)); + indexSet.addAll(reads); + indexSet.addAll(writes); + String[] indexes = indexSet.toArray(new String[0]); if ( indexes.length == 0 ) { logger.debug( "Not refreshing indexes. none found");
