Moved delete by query into index, since they cannot be batched Fixed version to be reserved field and updated mapping
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/43105f12 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/43105f12 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/43105f12 Branch: refs/heads/two-dot-o-events Commit: 43105f126dbe17b4446870e7760260829ef92ad9 Parents: 73a3bcc Author: Todd Nine <[email protected]> Authored: Tue Nov 18 17:19:19 2014 -0700 Committer: Todd Nine <[email protected]> Committed: Tue Nov 18 17:19:19 2014 -0700 ---------------------------------------------------------------------- .../events/EntityDeletedHandler.java | 5 +- .../events/EntityVersionCreatedHandler.java | 1 + .../usergrid/persistence/index/EntityIndex.java | 21 ++++- .../persistence/index/EntityIndexBatch.java | 13 ---- .../index/impl/EsEntityIndexBatchImpl.java | 66 ++++------------ .../index/impl/EsEntityIndexImpl.java | 81 ++++++++++++++++++-- .../persistence/index/impl/IndexingUtils.java | 19 ++++- 7 files changed, 127 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43105f12/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java index b5cb1ff..bd738fe 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java @@ -55,9 +55,6 @@ public class EntityDeletedHandler implements EntityDeleted { final EntityIndex ei = cpemf.getManagerCache().getEntityIndex(scope); - EntityIndexBatch batch = ei.createBatch(); - - batch.deleteEntity( entityId ); - batch.execute(); + ei.deleteAllVersionsOfEntity( entityId ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43105f12/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java index 94a673a..08b8fbd 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java @@ -58,6 +58,7 @@ public class EntityVersionCreatedHandler implements EntityVersionCreated { EntityIndexBatch batch = ei.createBatch(); + //TODO why aren't we using a collection fig here? This seems kludgy if ( System.getProperty( "allow.stale.entities", "false" ).equals( "false" )) { batch.deindexPreviousVersions( entity ); batch.execute(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43105f12/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java index 88498b3..29d8015 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java @@ -19,9 +19,12 @@ package org.apache.usergrid.persistence.index; +import java.util.UUID; + import org.apache.usergrid.persistence.core.util.Health; import org.apache.usergrid.persistence.index.query.Query; import org.apache.usergrid.persistence.index.query.CandidateResults; +import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; @@ -48,8 +51,24 @@ public interface EntityIndex { /** * Get the candidate results of all versions of the entity for this id. + * @param indexScope The scope of the index to search in + * @param id The id to search within. + */ + public CandidateResults getEntityVersions(final IndexScope indexScope, final Id id); + + /** + * Create a delete method that deletes by Id. This will delete all documents from ES with the same entity Id, + * effectively removing all versions of an entity from all index scopes + * @param entityId The entityId to remove + */ + public void deleteAllVersionsOfEntity(final Id entityId ); + + /** + * Takes all the previous versions of the current entity and deletes all previous versions + * @param id The id to remove + * @param version The max version to retain */ - public CandidateResults getEntityVersions(final IndexScope indexScope, Id id); + public void deletePreviousVersions(final Id id, final UUID version); /** * Refresh the index. http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43105f12/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java index bd1ee40..68008bf 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java @@ -56,19 +56,6 @@ public interface EntityIndexBatch { */ public EntityIndexBatch deindex(final IndexScope scope, final Id id, final UUID version); - /** - * Create a delete method that deletes by Id. This will delete all documents from ES with - * the same entity Id, effectively removing all versions of an entity from all index scopes. - */ - public EntityIndexBatch deleteEntity( Id entityId ); - - /** - * Takes all the previous versions of the current entity and deindexs all previous versions - * @param entity - * @return - */ - public EntityIndexBatch deindexPreviousVersions(Entity entity); - /** * Execute the batch http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43105f12/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java index f69a64c..e0ec1ed 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java @@ -29,7 +29,14 @@ import java.util.UUID; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.deletebyquery.DeleteByQueryRequest; +import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder; +import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.index.query.FilterBuilders; +import org.elasticsearch.index.query.FilteredQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.TermQueryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,18 +64,17 @@ import org.apache.usergrid.persistence.model.field.StringField; import org.apache.usergrid.persistence.model.field.UUIDField; import org.apache.usergrid.persistence.model.field.value.EntityObject; -import com.google.common.base.Joiner; - import static org.apache.usergrid.persistence.index.impl.IndexingUtils.ANALYZED_STRING_PREFIX; import static org.apache.usergrid.persistence.index.impl.IndexingUtils.BOOLEAN_PREFIX; -import static org.apache.usergrid.persistence.index.impl.IndexingUtils.ENTITYID_ID_FIELDNAME; import static org.apache.usergrid.persistence.index.impl.IndexingUtils.ENTITY_CONTEXT_FIELDNAME; +import static org.apache.usergrid.persistence.index.impl.IndexingUtils.ENTITY_ID_FIELDNAME; +import static org.apache.usergrid.persistence.index.impl.IndexingUtils.ENTITY_VERSION_FIELDNAME; import static org.apache.usergrid.persistence.index.impl.IndexingUtils.GEO_PREFIX; import static org.apache.usergrid.persistence.index.impl.IndexingUtils.NUMBER_PREFIX; import static org.apache.usergrid.persistence.index.impl.IndexingUtils.STRING_PREFIX; +import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createContextName; import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createIndexDocId; import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createIndexName; -import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createContextName; public class EsEntityIndexBatchImpl implements EntityIndexBatch { @@ -167,7 +173,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { } - log.debug( "De-indexing type {} with documentId '{}'" , entityType, indexId); + log.debug( "De-indexing type {} with documentId '{}'", entityType, indexId ); bulkRequest.add( client.prepareDelete( indexName, entityType, indexId ).setRefresh( refresh ) ); @@ -192,43 +198,6 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { return deindex( indexScope, entity.getId(), entity.getVersion() ); } - - @Override - public EntityIndexBatch deleteEntity(Id entityId) { - - TermQueryBuilder tqb = QueryBuilders.termQuery( - STRING_PREFIX + ENTITYID_FIELDNAME, entityId.getUuid().toString().toLowerCase()); - - DeleteByQueryResponse response = client.prepareDeleteByQuery( indexName ) - .setQuery( tqb ).execute().actionGet(); - - logger.debug("Deleted entity {}:{} from all index scopes with response status = {}", - new Object[] { entityId.getType(), entityId.getUuid(), response.status().toString() }); - - maybeFlush(); - - return this; - } - - - @Override - public EntityIndexBatch deindexPreviousVersions( Entity entity ) { - - FilteredQueryBuilder fqb = QueryBuilders.filteredQuery( QueryBuilders - .termQuery( STRING_PREFIX + ENTITYID_FIELDNAME, - entity.getId().getUuid().toString().toLowerCase() ), - FilterBuilders.rangeFilter( ENTITYVERSION_FIELDNAME ).lt( entity.getVersion().timestamp() ) ); - - DeleteByQueryResponse response = client.prepareDeleteByQuery( indexName ).setQuery( fqb ).execute().actionGet(); - - //error message needs to be retooled so that it describes the entity more throughly - logger.debug( "Deleted entity {}:{} from all index scopes with response status = {}", - new Object[] { entity.getId().getType(), entity.getId().getUuid(), response.status().toString() } ); - - maybeFlush(); - - return this; - } @Override @@ -251,9 +220,6 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { try { responses = request.execute().actionGet(); - } catch (Throwable t) { - logger.error("Unable to communicate with elasticsearch"); - failureMonitor.fail("Unable to execute batch", t); } catch ( Throwable t ) { log.error( "Unable to communicate with elasticsearch" ); @@ -293,9 +259,9 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { /** * Set the entity as a map with the context + * * @param entity The entity * @param context The context this entity appears in - * @return */ private static Map entityToMap( final Entity entity, final String context ) { final Map entityMap = entityToMap( entity ); @@ -304,8 +270,11 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { entityMap.put( ENTITY_CONTEXT_FIELDNAME, context ); //but the fieldname - //we have to prefix because we use query equality to seek this later. TODO see if we can make this more declarative - entityMap.put( ENTITYID_ID_FIELDNAME, IndexingUtils.idString(entity.getId()).toLowerCase() ); + //we have to prefix because we use query equality to seek this later. TODO see if we can make this more + // declarative + entityMap.put( ENTITY_ID_FIELDNAME, IndexingUtils.idString( entity.getId() ).toLowerCase() ); + + entityMap.put( ENTITY_ID_FIELDNAME, IndexingUtils.idString( entity.getId() ).toLowerCase() ); return entityMap; } @@ -428,5 +397,4 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { private void initBatch() { this.bulkRequest = client.prepareBulk(); } - } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43105f12/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java index b1e5374..406c967 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest; @@ -31,6 +32,8 @@ import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; +import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse; +import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequestBuilder; @@ -40,11 +43,15 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.query.FilterBuilder; +import org.elasticsearch.index.query.FilterBuilders; +import org.elasticsearch.index.query.FilteredQueryBuilder; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndexMissingException; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.sort.FieldSortBuilder; @@ -65,6 +72,7 @@ import org.apache.usergrid.persistence.index.exceptions.IndexException; import org.apache.usergrid.persistence.index.query.CandidateResult; import org.apache.usergrid.persistence.index.query.CandidateResults; import org.apache.usergrid.persistence.index.query.Query; +import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.SimpleId; import org.apache.usergrid.persistence.model.util.UUIDGenerator; @@ -74,7 +82,8 @@ import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; import static org.apache.usergrid.persistence.index.impl.IndexingUtils.BOOLEAN_PREFIX; -import static org.apache.usergrid.persistence.index.impl.IndexingUtils.ENTITYID_ID_FIELDNAME; +import static org.apache.usergrid.persistence.index.impl.IndexingUtils.ENTITY_ID_FIELDNAME; +import static org.apache.usergrid.persistence.index.impl.IndexingUtils.ENTITY_VERSION_FIELDNAME; import static org.apache.usergrid.persistence.index.impl.IndexingUtils.NUMBER_PREFIX; import static org.apache.usergrid.persistence.index.impl.IndexingUtils.SPLITTER; import static org.apache.usergrid.persistence.index.impl.IndexingUtils.STRING_PREFIX; @@ -221,11 +230,9 @@ public class EsEntityIndexImpl implements EntityIndex { xcb ) // set mapping as the default for all types .execute().actionGet(); - if(!pitr.isAcknowledged()){ + if ( !pitr.isAcknowledged() ) { throw new IndexException( "Unable to create default mappings" ); } - - } @@ -251,7 +258,6 @@ public class EsEntityIndexImpl implements EntityIndex { .setScroll( cursorTimeout + "m" ).setQuery( qb ); - final FilterBuilder fb = query.createFilterBuilder(); @@ -300,7 +306,6 @@ public class EsEntityIndexImpl implements EntityIndex { } - if ( logger.isDebugEnabled() ) { logger.debug( "Searching index {}\n scope{} \n type {}\n query {} ", new Object[] { this.indexName, context, entityTypes, srb @@ -412,7 +417,9 @@ public class EsEntityIndexImpl implements EntityIndex { @Override public int getPendingTasks() { - final PendingClusterTasksResponse tasksResponse = esProvider.getClient().admin().cluster().pendingClusterTasks( new PendingClusterTasksRequest() ).actionGet(); + final PendingClusterTasksResponse tasksResponse = + esProvider.getClient().admin().cluster().pendingClusterTasks( new PendingClusterTasksRequest() ) + .actionGet(); return tasksResponse.pendingTasks().size(); } @@ -447,7 +454,65 @@ public class EsEntityIndexImpl implements EntityIndex { failureMonitor.success(); - return parseResults( searchResponse, new Query( ) ); + return parseResults( searchResponse, new Query() ); + } + + + @Override + public void deleteAllVersionsOfEntity( Id entityId ) { + + final TermQueryBuilder tqb = + QueryBuilders.termQuery( ENTITY_ID_FIELDNAME, entityId.getUuid().toString().toLowerCase() ); + + + final DeleteByQueryResponse response = + esProvider.getClient().prepareDeleteByQuery( indexName ).setQuery( tqb ).execute().actionGet(); + + + logger.debug( "Deleted entity {}:{} from all index scopes with response status = {}", + new Object[] { entityId.getType(), entityId.getUuid(), response.status().toString() } ); + + checkDeleteByQueryResponse( tqb, response ); + + } + + + @Override + public void deletePreviousVersions( final Id id, final UUID version ) { + + final FilteredQueryBuilder fqb = QueryBuilders.filteredQuery( + QueryBuilders.termQuery( ENTITY_ID_FIELDNAME, id.getUuid().toString().toLowerCase() ), + + FilterBuilders.rangeFilter( ENTITY_VERSION_FIELDNAME ).lt( version.timestamp() ) ); + + final DeleteByQueryResponse response = + esProvider.getClient().prepareDeleteByQuery( indexName ).setQuery( fqb ).execute().actionGet(); + + //error message needs to be retooled so that it describes the entity more throughly + logger.debug( "Deleted entity {}:{} with version {} from all index scopes with response status = {}", + new Object[] { id.getType(), id.getUuid(), version, response.status().toString() } ); + + checkDeleteByQueryResponse( fqb, response ); + } + + + /** + * Validate the response doens't contain errors, if it does, fail fast at the first error we encounter + * @param query + * @param response + */ + private void checkDeleteByQueryResponse( final QueryBuilder query, final DeleteByQueryResponse response ) { + for ( IndexDeleteByQueryResponse indexDeleteByQueryResponse : response ) { + final ShardOperationFailedException[] failures = indexDeleteByQueryResponse.getFailures(); + + for ( ShardOperationFailedException failedException : failures ) { + throw new IndexException( String.format( + "Unable to delete by query %s. Failed with code %d and reason %s on shard %s in index %s", + query.toString(), failedException.status(), failedException.reason(), failedException.shardId(), + failedException.index() ) ); + } + + } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43105f12/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java index c7592bc..ee5557c 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java @@ -50,7 +50,9 @@ public class IndexingUtils { public static final String ENTITY_CONTEXT_FIELDNAME = "ug_context"; - public static final String ENTITYID_ID_FIELDNAME = "ug_entityId"; + public static final String ENTITY_ID_FIELDNAME = "ug_entityId"; + + public static final String ENTITY_VERSION_FIELDNAME = "ug_entityVersion"; @@ -159,8 +161,8 @@ public class IndexingUtils { //we need most specific mappings first since it's a stop on match algorithm .startObject() - .startObject( "context_template" ) - .field( "match", IndexingUtils.ENTITYID_ID_FIELDNAME ) + .startObject( "entity_id_template" ) + .field( "match", IndexingUtils.ENTITY_ID_FIELDNAME ) .field( "match_mapping_type", "string" ) .startObject( "mapping" ).field( "type", "string" ) .field( "index", "not_analyzed" ) @@ -170,7 +172,7 @@ public class IndexingUtils { .startObject() - .startObject( "context_template" ) + .startObject( "entity_context_template" ) .field( "match", IndexingUtils.ENTITY_CONTEXT_FIELDNAME ) .field( "match_mapping_type", "string" ) .startObject( "mapping" ).field( "type", "string" ) @@ -178,6 +180,15 @@ public class IndexingUtils { .endObject() .endObject() + .startObject() + .startObject( "entity_version_template" ) + .field( "match", IndexingUtils.ENTITY_VERSION_FIELDNAME ) + .field( "match_mapping_type", "string" ) + .startObject( "mapping" ).field( "type", "long" ) + .endObject() + .endObject() + .endObject() + // any string with field name that starts with sa_ gets analyzed .startObject() .startObject( "template_1" ).field( "match", ANALYZED_STRING_PREFIX + "*" )
