add application entity index
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/679812e6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/679812e6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/679812e6 Branch: refs/heads/USERGRID-347 Commit: 679812e6dbb52f1ab3b303adfe209167b3b25f7f Parents: 5ea8194 Author: Shawn Feldman <[email protected]> Authored: Thu Mar 19 10:13:48 2015 -0600 Committer: Shawn Feldman <[email protected]> Committed: Thu Mar 19 10:13:48 2015 -0600 ---------------------------------------------------------------------- .../usergrid/corepersistence/ManagerCache.java | 3 +- .../index/ApplicationEntityIndex.java | 42 +++ .../usergrid/persistence/index/EntityIndex.java | 25 +- .../persistence/index/EntityIndexFactory.java | 2 +- .../persistence/index/IndexIdentifier.java | 9 +- .../persistence/index/guice/IndexModule.java | 5 +- .../impl/EsApplicationEntityIndexImpl.java | 305 ++++++++++++++++ .../index/impl/EsEntityIndexBatchImpl.java | 8 +- .../index/impl/EsEntityIndexFactoryImpl.java | 27 +- .../index/impl/EsEntityIndexImpl.java | 351 ++----------------- .../persistence/index/impl/IndexingUtils.java | 15 - .../index/impl/CorePerformanceIT.java | 20 +- .../impl/EntityConnectionIndexImplTest.java | 20 +- .../persistence/index/impl/EntityIndexTest.java | 92 +++-- 14 files changed, 461 insertions(+), 463 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/679812e6/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagerCache.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagerCache.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagerCache.java index c1b7b95..d747f55 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagerCache.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagerCache.java @@ -24,6 +24,7 @@ import org.apache.usergrid.persistence.collection.CollectionScope; import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.index.ApplicationEntityIndex; import org.apache.usergrid.persistence.index.EntityIndex; import org.apache.usergrid.persistence.map.MapManager; import org.apache.usergrid.persistence.map.MapScope; @@ -45,7 +46,7 @@ public interface ManagerCache { * @param appScope * @return */ - EntityIndex getEntityIndex( ApplicationScope appScope ); + ApplicationEntityIndex getEntityIndex( ApplicationScope appScope ); /** * Get the graph manager for the graph scope http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/679812e6/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java new file mode 100644 index 0000000..fab32b3 --- /dev/null +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java @@ -0,0 +1,42 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. The ASF licenses this file to You + * * under the Apache License, Version 2.0 (the "License"); you may not + * * use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. For additional information regarding + * * copyright in this work, please see the NOTICE file in the top level + * * directory of this distribution. + * + */ +package org.apache.usergrid.persistence.index; + +import org.apache.usergrid.persistence.index.query.CandidateResults; +import org.apache.usergrid.persistence.index.query.Query; +import rx.Observable; + +/** + * Classy class class. + */ +public interface ApplicationEntityIndex { + + + /** + * Create the index batch. + */ + public EntityIndexBatch createBatch(); + + + /** + * Execute query in Usergrid syntax. + */ + public CandidateResults search(final IndexScope indexScope, final SearchTypes searchType, Query query ); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/679812e6/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java index db4b50d..521a4e7 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java @@ -26,13 +26,14 @@ import org.apache.usergrid.persistence.index.query.Query; import org.apache.usergrid.persistence.index.query.CandidateResults; import org.apache.usergrid.persistence.model.entity.Id; import org.elasticsearch.action.ListenableActionFuture; +import rx.Observable; import java.util.Map; import java.util.concurrent.Future; /** - * Provides indexing of Entities within a scope. + * Provides management operations for single index */ public interface EntityIndex { @@ -41,12 +42,6 @@ public interface EntityIndex { * Otherwise we're introducing slowness into our system */ public void initializeIndex(); - - /** - * Delete the index from ES - */ - public ListenableActionFuture deleteIndex(); - /** * Create an index and add to alias, will create alias and remove any old index from write alias if alias already exists * @param indexSuffix index name @@ -56,27 +51,13 @@ public interface EntityIndex { */ public void addIndex(final String indexSuffix, final int shards, final int replicas, final String writeConsistency); - /** - * Create the index batch. - */ - public EntityIndexBatch createBatch(); - - - /** - * Execute query in Usergrid syntax. - */ - public CandidateResults search(final IndexScope indexScope, final SearchTypes searchType, Query query ); /** * Refresh the index. */ public void refresh(); - /** - * Return the number of pending tasks in the cluster - * @return - */ - public int getPendingTasks(); + /** * Check health of cluster. http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/679812e6/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexFactory.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexFactory.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexFactory.java index 10752d1..1c594e7 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexFactory.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexFactory.java @@ -25,7 +25,7 @@ import com.google.inject.assistedinject.Assisted; public interface EntityIndexFactory { - public EntityIndex createEntityIndex( + public ApplicationEntityIndex createApplicationEntityIndex( @Assisted ApplicationScope appScope); void invalidate(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/679812e6/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java index 8465d84..1ac9d49 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java @@ -20,19 +20,14 @@ package org.apache.usergrid.persistence.index; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.index.impl.IndexingUtils; - /** * Class is used to generate an index name and alias name */ public class IndexIdentifier{ private final IndexFig config; - private final ApplicationScope applicationScope; - public IndexIdentifier(IndexFig config, ApplicationScope applicationScope) { + public IndexIdentifier(IndexFig config) { this.config = config; - this.applicationScope = applicationScope; } /** @@ -76,7 +71,7 @@ public class IndexIdentifier{ } public String toString() { - return "application: " + applicationScope.getApplication().getUuid(); + return "index id"+config.getIndexPrefix(); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/679812e6/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java index 95f3bd4..c9125c5 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java @@ -45,8 +45,11 @@ public class IndexModule extends AbstractModule { install(new MapModule()); install(new QueueModule()); - bind(EntityIndexFactory.class).to( EsEntityIndexFactoryImpl.class ); + bind(AliasedEntityIndex.class).to(EsEntityIndexImpl.class); + bind(EntityIndex.class).to(EsEntityIndexImpl.class); + bind(IndexIdentifier.class); + bind(IndexBufferProducer.class).to(EsIndexBufferProducerImpl.class); bind(IndexBufferConsumer.class).to(EsIndexBufferConsumerImpl.class).asEagerSingleton(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/679812e6/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java new file mode 100644 index 0000000..f8ed107 --- /dev/null +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java @@ -0,0 +1,305 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. The ASF licenses this file to You + * * under the Apache License, Version 2.0 (the "License"); you may not + * * use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. For additional information regarding + * * copyright in this work, please see the NOTICE file in the top level + * * directory of this distribution. + * + */ +package org.apache.usergrid.persistence.index.impl; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.Timer; +import com.google.common.base.Preconditions; +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.core.util.ValidationUtils; +import org.apache.usergrid.persistence.index.*; +import org.apache.usergrid.persistence.index.query.CandidateResult; +import org.apache.usergrid.persistence.index.query.CandidateResults; +import org.apache.usergrid.persistence.index.query.Query; +import org.apache.usergrid.persistence.map.MapManager; +import org.apache.usergrid.persistence.map.MapManagerFactory; +import org.apache.usergrid.persistence.map.MapScope; +import org.apache.usergrid.persistence.map.impl.MapScopeImpl; +import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.persistence.model.entity.SimpleId; +import org.apache.usergrid.persistence.model.util.UUIDGenerator; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.ShardOperationFailedException; +import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse; +import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequestBuilder; +import org.elasticsearch.index.query.FilterBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.sort.FieldSortBuilder; +import org.elasticsearch.search.sort.SortBuilders; +import org.elasticsearch.search.sort.SortOrder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import rx.Observable; +import rx.functions.Action1; +import rx.schedulers.Schedulers; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.usergrid.persistence.index.impl.IndexingUtils.*; +import static org.apache.usergrid.persistence.index.impl.IndexingUtils.SPLITTER; + +/** + * Classy class class. + */ +public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex{ + + private static final Logger logger = LoggerFactory.getLogger(EsApplicationEntityIndexImpl.class); + + private final ApplicationScope applicationScope; + private final IndexIdentifier indexIdentifier; + private final Timer searchTimer; + private final Timer cursorTimer; + private final MapManager mapManager; + private final AliasedEntityIndex entityIndex; + private final IndexBufferProducer indexBatchBufferProducer; + private final EsIndexCache indexCache; + private final IndexFig indexFig; + private final EsProvider esProvider; + private final IndexIdentifier.IndexAlias alias; + private FailureMonitor failureMonitor; + private final int cursorTimeout; + @Inject + public EsApplicationEntityIndexImpl(@Assisted ApplicationScope appScope, final AliasedEntityIndex entityIndex, final IndexFig config, + final IndexBufferProducer indexBatchBufferProducer, final EsProvider provider, + final EsIndexCache indexCache, final MetricsFactory metricsFactory, + final MapManagerFactory mapManagerFactory, final IndexFig indexFig, final IndexIdentifier indexIdentifier){ + this.entityIndex = entityIndex; + this.indexBatchBufferProducer = indexBatchBufferProducer; + this.indexCache = indexCache; + this.indexFig = indexFig; + this.indexIdentifier = indexIdentifier; + ValidationUtils.validateApplicationScope(appScope); + this.applicationScope = appScope; + final MapScope mapScope = new MapScopeImpl(appScope.getApplication(), "cursorcache"); + this.failureMonitor = new FailureMonitorImpl(config, provider); + this.esProvider = provider; + + mapManager = mapManagerFactory.createMapManager(mapScope); + this.searchTimer = metricsFactory + .getTimer(EsEntityIndexImpl.class, "search.timer"); + this.cursorTimer = metricsFactory + .getTimer(EsEntityIndexImpl.class, "search.cursor.timer"); + this.cursorTimeout = config.getQueryCursorTimeout(); + + this.alias = indexIdentifier.getAlias(); + + } + + @Override + public EntityIndexBatch createBatch() { + EntityIndexBatch batch = new EsEntityIndexBatchImpl( + applicationScope, indexBatchBufferProducer, entityIndex, indexIdentifier ); + return batch; + } + + @Override + public CandidateResults search(final IndexScope indexScope, final SearchTypes searchTypes, + final Query query ) { + + final String context = IndexingUtils.createContextName(indexScope); + final String[] entityTypes = searchTypes.getTypeNames(applicationScope); + QueryBuilder qb = query.createQueryBuilder(context); + SearchResponse searchResponse; + + if ( query.getCursor() == null ) { + SearchRequestBuilder srb = esProvider.getClient().prepareSearch( alias.getReadAlias() ) + .setTypes(entityTypes) + .setScroll(cursorTimeout + "m") + .setQuery(qb); + + final FilterBuilder fb = query.createFilterBuilder(); + + //we have post filters, apply them + if ( fb != null ) { + logger.debug( " Filter: {} ", fb.toString() ); + srb = srb.setPostFilter( fb ); + } + + + srb = srb.setFrom( 0 ).setSize( query.getLimit() ); + + for ( Query.SortPredicate sp : query.getSortPredicates() ) { + + final SortOrder order; + if ( sp.getDirection().equals( Query.SortDirection.ASCENDING ) ) { + order = SortOrder.ASC; + } + else { + order = SortOrder.DESC; + } + + // we do not know the type of the "order by" property and so we do not know what + // type prefix to use. So, here we add an order by clause for every possible type + // that you can order by: string, number and boolean and we ask ElasticSearch + // to ignore any fields that are not present. + + final String stringFieldName = STRING_PREFIX + sp.getPropertyName(); + final FieldSortBuilder stringSort = SortBuilders.fieldSort(stringFieldName) + .order( order ).ignoreUnmapped( true ); + srb.addSort( stringSort ); + + logger.debug( " Sort: {} order by {}", stringFieldName, order.toString() ); + + final String longFieldName = LONG_PREFIX + sp.getPropertyName(); + final FieldSortBuilder longSort = SortBuilders.fieldSort( longFieldName ) + .order( order ).ignoreUnmapped( true ); + srb.addSort( longSort ); + logger.debug( " Sort: {} order by {}", longFieldName, order.toString() ); + + + final String doubleFieldName = DOUBLE_PREFIX + sp.getPropertyName(); + final FieldSortBuilder doubleSort = SortBuilders.fieldSort( doubleFieldName ) + .order( order ).ignoreUnmapped( true ); + srb.addSort( doubleSort ); + logger.debug( " Sort: {} order by {}", doubleFieldName, order.toString() ); + + + final String booleanFieldName = BOOLEAN_PREFIX + sp.getPropertyName(); + final FieldSortBuilder booleanSort = SortBuilders.fieldSort( booleanFieldName ) + .order( order ).ignoreUnmapped( true ); + srb.addSort( booleanSort ); + logger.debug( " Sort: {} order by {}", booleanFieldName, order.toString() ); + } + + + if ( logger.isDebugEnabled() ) { + logger.debug( "Searching index (read alias): {}\n scope: {} \n type: {}\n query: {} ", + this.alias.getReadAlias(), context, entityTypes, srb ); + } + + try { + //Added For Graphite Metrics + Timer.Context timeSearch = searchTimer.time(); + searchResponse = srb.execute().actionGet(); + timeSearch.stop(); + } + catch ( Throwable t ) { + logger.error( "Unable to communicate with Elasticsearch", t ); + failureMonitor.fail( "Unable to execute batch", t ); + throw t; + } + + + failureMonitor.success(); + } + else { + String userCursorString = query.getCursor(); + if ( userCursorString.startsWith( "\"" ) ) { + userCursorString = userCursorString.substring( 1 ); + } + if ( userCursorString.endsWith( "\"" ) ) { + userCursorString = userCursorString.substring( 0, userCursorString.length() - 1 ); + } + + //now get the cursor from the map and validate + final String esScrollCursor = mapManager.getString( userCursorString ); + + Preconditions.checkArgument(esScrollCursor != null, "Could not find a cursor for the value '{}' ", esScrollCursor); + + + + logger.debug( "Executing query with cursor: {} ", esScrollCursor ); + + + SearchScrollRequestBuilder ssrb = esProvider.getClient() + .prepareSearchScroll(esScrollCursor).setScroll( cursorTimeout + "m" ); + + try { + //Added For Graphite Metrics + Timer.Context timeSearchCursor = cursorTimer.time(); + searchResponse = ssrb.execute().actionGet(); + timeSearchCursor.stop(); + } + catch ( Throwable t ) { + logger.error( "Unable to communicate with elasticsearch", t ); + failureMonitor.fail( "Unable to execute batch", t ); + throw t; + } + + + failureMonitor.success(); + } + + return parseResults(searchResponse, query); + } + + + private CandidateResults parseResults( final SearchResponse searchResponse, final Query query ) { + + final SearchHits searchHits = searchResponse.getHits(); + final SearchHit[] hits = searchHits.getHits(); + final int length = hits.length; + + logger.debug(" Hit count: {} Total hits: {}", length, searchHits.getTotalHits()); + + List<CandidateResult> candidates = new ArrayList<>( length ); + + for ( SearchHit hit : hits ) { + + String[] idparts = hit.getId().split( SPLITTER ); + String id = idparts[0]; + String type = idparts[1]; + String version = idparts[2]; + + Id entityId = new SimpleId( UUID.fromString(id), type ); + + candidates.add( new CandidateResult( entityId, UUID.fromString( version ) ) ); + } + + CandidateResults candidateResults = new CandidateResults( query, candidates ); + + if ( candidates.size() >= query.getLimit() ) { + //USERGRID-461 our cursor is getting too large, map it to a new time UUID + //TODO T.N., this shouldn't live here. This should live at the UG core tier. However the RM/EM are an absolute mess, so until they're refactored, this is it's home + + final String userCursorString = org.apache.usergrid.persistence.index.utils.StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() ); + + final String esScrollCursor = searchResponse.getScrollId(); + + //now set this into our map module + final int minutes = indexFig.getQueryCursorTimeout(); + + //just truncate it, we'll never hit a long value anyway + mapManager.putString( userCursorString, esScrollCursor, ( int ) TimeUnit.MINUTES.toSeconds( minutes ) ); + + candidateResults.setCursor( userCursorString ); + logger.debug(" User cursor = {}, Cursor = {} ", userCursorString, esScrollCursor); + } + + return candidateResults; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/679812e6/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java index d7b1f9c..1082201 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java @@ -76,13 +76,13 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { public EsEntityIndexBatchImpl(final ApplicationScope applicationScope, - final IndexBufferProducer indexBatchBufferProducer,final IndexFig config, - final AliasedEntityIndex entityIndex ) { + final IndexBufferProducer indexBatchBufferProducer, + final AliasedEntityIndex entityIndex, IndexIdentifier indexIdentifier ) { this.applicationScope = applicationScope; this.indexBatchBufferProducer = indexBatchBufferProducer; this.entityIndex = entityIndex; - this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, applicationScope); + this.indexIdentifier = indexIdentifier; this.alias = indexIdentifier.getAlias(); //constrained this.container = new IndexOperationMessage(); @@ -96,7 +96,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { ValidationUtils.verifyVersion( entity.getVersion() ); //add app id for indexing entity.setField( - new StringField(APPLICATION_ID_FIELDNAME,IndexingUtils.idString(applicationScope.getApplication())) + new StringField(APPLICATION_ID_FIELDNAME, IndexingUtils.idString(applicationScope.getApplication())) ); final String context = createContextName(indexScope); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/679812e6/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java index 8af309d..426f587 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java @@ -24,12 +24,10 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.index.EntityIndex; -import org.apache.usergrid.persistence.index.EntityIndexFactory; -import org.apache.usergrid.persistence.index.IndexBufferProducer; -import org.apache.usergrid.persistence.index.IndexFig; +import org.apache.usergrid.persistence.index.*; import org.apache.usergrid.persistence.map.MapManagerFactory; import java.util.concurrent.ExecutionException; @@ -46,12 +44,15 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{ private final MetricsFactory metricsFactory; private final MapManagerFactory mapManagerFactory; private final IndexFig indexFig; + private final AliasedEntityIndex entityIndex; + private final IndexIdentifier indexIdentifier; - private LoadingCache<ApplicationScope, EntityIndex> eiCache = - CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<ApplicationScope, EntityIndex>() { - public EntityIndex load( ApplicationScope scope ) { - return new EsEntityIndexImpl(scope,config, indexBatchBufferProducer, provider,indexCache, metricsFactory, - mapManagerFactory, indexFig ); + private LoadingCache<ApplicationScope, ApplicationEntityIndex> eiCache = + CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<ApplicationScope, ApplicationEntityIndex>() { + public ApplicationEntityIndex load( ApplicationScope scope ) { + return new EsApplicationEntityIndexImpl( + scope,entityIndex,config, indexBatchBufferProducer, provider,indexCache, metricsFactory, mapManagerFactory, indexFig, indexIdentifier + ); } } ); @@ -59,7 +60,7 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{ public EsEntityIndexFactoryImpl( final IndexFig config, final EsProvider provider, final EsIndexCache indexCache, final IndexBufferProducer indexBatchBufferProducer, final MetricsFactory metricsFactory, final MapManagerFactory mapManagerFactory, - final IndexFig indexFig ){ + final IndexFig indexFig, final AliasedEntityIndex entityIndex, final IndexIdentifier indexIdentifier ){ this.config = config; this.provider = provider; this.indexCache = indexCache; @@ -67,10 +68,14 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{ this.metricsFactory = metricsFactory; this.mapManagerFactory = mapManagerFactory; this.indexFig = indexFig; + this.entityIndex = entityIndex; + this.indexIdentifier = indexIdentifier; } + + @Override - public EntityIndex createEntityIndex(final ApplicationScope appScope) { + public ApplicationEntityIndex createApplicationEntityIndex(@Assisted final ApplicationScope appScope) { try{ return eiCache.get(appScope); }catch (ExecutionException ee){ http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/679812e6/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java index f22016e..ae5e3c3 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java @@ -18,55 +18,29 @@ package org.apache.usergrid.persistence.index.impl; -import com.codahale.metrics.Meter; import com.codahale.metrics.Timer; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; -import com.yammer.metrics.core.Clock; +import com.google.inject.Singleton; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.ArrayUtils; import org.apache.usergrid.persistence.core.future.BetterFuture; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.util.Health; -import org.apache.usergrid.persistence.core.util.ValidationUtils; import org.apache.usergrid.persistence.index.*; import org.apache.usergrid.persistence.index.exceptions.IndexException; -import org.apache.usergrid.persistence.index.query.CandidateResult; -import org.apache.usergrid.persistence.index.query.CandidateResults; -import org.apache.usergrid.persistence.index.query.Query; -import org.apache.usergrid.persistence.index.utils.UUIDUtils; -import org.apache.usergrid.persistence.map.MapManager; -import org.apache.usergrid.persistence.map.MapManagerFactory; -import org.apache.usergrid.persistence.map.MapScope; -import org.apache.usergrid.persistence.map.impl.MapScopeImpl; -import org.apache.usergrid.persistence.model.entity.Id; -import org.apache.usergrid.persistence.model.entity.SimpleId; + import org.apache.usergrid.persistence.model.util.UUIDGenerator; import org.elasticsearch.action.ActionFuture; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ListenableActionFuture; -import org.elasticsearch.action.ShardOperationFailedException; + import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest; -import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; -import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; -import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse; -import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchScrollRequestBuilder; + import org.elasticsearch.client.AdminClient; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; @@ -75,61 +49,39 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.query.*; import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndexMissingException; -import org.elasticsearch.indices.InvalidAliasNameException; -import org.elasticsearch.rest.action.admin.indices.alias.delete.AliasesMissingException; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchHits; -import org.elasticsearch.search.sort.FieldSortBuilder; -import org.elasticsearch.search.sort.SortBuilders; -import org.elasticsearch.search.sort.SortOrder; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import rx.Observable; -import rx.functions.Func1; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import static org.apache.usergrid.persistence.index.impl.IndexingUtils.*; +import java.io.IOException; /** * Implements index using ElasticSearch Java API. */ +@Singleton public class EsEntityIndexImpl implements AliasedEntityIndex { private static final Logger logger = LoggerFactory.getLogger( EsEntityIndexImpl.class ); - private static final AtomicBoolean mappingsCreated = new AtomicBoolean( false ); public static final String DEFAULT_TYPE = "_default_"; private final IndexIdentifier.IndexAlias alias; - private final IndexIdentifier indexIdentifier; private final IndexBufferProducer indexBatchBufferProducer; private final IndexFig indexFig; private final Timer addTimer; private final Timer updateAliasTimer; - private final Timer searchTimer; + /** * We purposefully make this per instance. Some indexes may work, while others may fail */ - private FailureMonitor failureMonitor; - private final ApplicationScope applicationScope; private final EsProvider esProvider; - - private final int cursorTimeout; - private final IndexFig config; - - //number of times to wait for the index to refresh properly. private static final int MAX_WAITS = 10; //number of milliseconds to try again before sleeping @@ -141,53 +93,41 @@ public class EsEntityIndexImpl implements AliasedEntityIndex { ImmutableMap.<String, Object>builder().put( "field", "test" ).put(IndexingUtils.ENTITYID_ID_FIELDNAME, UUIDGenerator.newTimeUUID().toString()).build(); private static final MatchAllQueryBuilder MATCH_ALL_QUERY_BUILDER = QueryBuilders.matchAllQuery(); + private final IndexIdentifier indexIdentifier; private EsIndexCache aliasCache; private Timer mappingTimer; private Timer refreshTimer; - private Timer cursorTimer; - private Timer getVersionsTimer; - private final MapManager mapManager; // private final Timer indexTimer; @Inject - public EsEntityIndexImpl( @Assisted final ApplicationScope appScope, final IndexFig config, + public EsEntityIndexImpl( final IndexFig config, final IndexBufferProducer indexBatchBufferProducer, final EsProvider provider, final EsIndexCache indexCache, final MetricsFactory metricsFactory, - final MapManagerFactory mapManagerFactory, final IndexFig indexFig ) { + final IndexFig indexFig, final IndexIdentifier indexIdentifier ) { this.indexBatchBufferProducer = indexBatchBufferProducer; this.indexFig = indexFig; - ValidationUtils.validateApplicationScope( appScope ); - this.applicationScope = appScope; + this.indexIdentifier = indexIdentifier; + this.esProvider = provider; this.config = config; - this.cursorTimeout = config.getQueryCursorTimeout(); - this.indexIdentifier = IndexingUtils.createIndexIdentifier( config, appScope ); + + + this.alias = indexIdentifier.getAlias(); - this.failureMonitor = new FailureMonitorImpl( config, provider ); this.aliasCache = indexCache; this.addTimer = metricsFactory - .getTimer( EsEntityIndexImpl.class, "add.timer" ); + .getTimer(EsEntityIndexImpl.class, "add.timer"); this.updateAliasTimer = metricsFactory - .getTimer( EsEntityIndexImpl.class, "update.alias.timer" ); + .getTimer(EsEntityIndexImpl.class, "update.alias.timer"); this.mappingTimer = metricsFactory - .getTimer( EsEntityIndexImpl.class, "create.mapping.timer" ); + .getTimer(EsEntityIndexImpl.class, "create.mapping.timer"); this.refreshTimer = metricsFactory - .getTimer( EsEntityIndexImpl.class, "refresh.timer" ); - this.searchTimer =metricsFactory - .getTimer( EsEntityIndexImpl.class, "search.timer" ); - this.cursorTimer = metricsFactory - .getTimer( EsEntityIndexImpl.class, "search.cursor.timer" ); - this.getVersionsTimer =metricsFactory - .getTimer( EsEntityIndexImpl.class, "get.versions.timer" ); - - - final MapScope mapScope = new MapScopeImpl( appScope.getApplication(), "cursorcache" ); + .getTimer(EsEntityIndexImpl.class, "refresh.timer"); - mapManager = mapManagerFactory.createMapManager(mapScope); } @Override @@ -385,190 +325,6 @@ public class EsEntityIndexImpl implements AliasedEntityIndex { } - @Override - public EntityIndexBatch createBatch() { - EntityIndexBatch batch = new EsEntityIndexBatchImpl( - applicationScope, indexBatchBufferProducer, config, this ); - return batch; - } - - - @Override - public CandidateResults search(final IndexScope indexScope, final SearchTypes searchTypes, - final Query query ) { - - final String context = IndexingUtils.createContextName(indexScope); - final String[] entityTypes = searchTypes.getTypeNames(applicationScope); - QueryBuilder qb = query.createQueryBuilder(context); - SearchResponse searchResponse; - - if ( query.getCursor() == null ) { - SearchRequestBuilder srb = esProvider.getClient().prepareSearch( alias.getReadAlias() ) - .setTypes(entityTypes) - .setScroll(cursorTimeout + "m") - .setQuery(qb); - - final FilterBuilder fb = query.createFilterBuilder(); - - //we have post filters, apply them - if ( fb != null ) { - logger.debug( " Filter: {} ", fb.toString() ); - srb = srb.setPostFilter( fb ); - } - - - srb = srb.setFrom( 0 ).setSize( query.getLimit() ); - - for ( Query.SortPredicate sp : query.getSortPredicates() ) { - - final SortOrder order; - if ( sp.getDirection().equals( Query.SortDirection.ASCENDING ) ) { - order = SortOrder.ASC; - } - else { - order = SortOrder.DESC; - } - - // we do not know the type of the "order by" property and so we do not know what - // type prefix to use. So, here we add an order by clause for every possible type - // that you can order by: string, number and boolean and we ask ElasticSearch - // to ignore any fields that are not present. - - final String stringFieldName = STRING_PREFIX + sp.getPropertyName(); - final FieldSortBuilder stringSort = SortBuilders.fieldSort( stringFieldName ) - .order( order ).ignoreUnmapped( true ); - srb.addSort( stringSort ); - - logger.debug( " Sort: {} order by {}", stringFieldName, order.toString() ); - - final String longFieldName = LONG_PREFIX + sp.getPropertyName(); - final FieldSortBuilder longSort = SortBuilders.fieldSort( longFieldName ) - .order( order ).ignoreUnmapped( true ); - srb.addSort( longSort ); - logger.debug( " Sort: {} order by {}", longFieldName, order.toString() ); - - - final String doubleFieldName = DOUBLE_PREFIX + sp.getPropertyName(); - final FieldSortBuilder doubleSort = SortBuilders.fieldSort( doubleFieldName ) - .order( order ).ignoreUnmapped( true ); - srb.addSort( doubleSort ); - logger.debug( " Sort: {} order by {}", doubleFieldName, order.toString() ); - - - final String booleanFieldName = BOOLEAN_PREFIX + sp.getPropertyName(); - final FieldSortBuilder booleanSort = SortBuilders.fieldSort( booleanFieldName ) - .order( order ).ignoreUnmapped( true ); - srb.addSort( booleanSort ); - logger.debug( " Sort: {} order by {}", booleanFieldName, order.toString() ); - } - - - if ( logger.isDebugEnabled() ) { - logger.debug( "Searching index (read alias): {}\n scope: {} \n type: {}\n query: {} ", - this.alias.getReadAlias(), context, entityTypes, srb ); - } - - try { - //Added For Graphite Metrics - Timer.Context timeSearch = searchTimer.time(); - searchResponse = srb.execute().actionGet(); - timeSearch.stop(); - } - catch ( Throwable t ) { - logger.error( "Unable to communicate with Elasticsearch", t ); - failureMonitor.fail( "Unable to execute batch", t ); - throw t; - } - - - failureMonitor.success(); - } - else { - String userCursorString = query.getCursor(); - if ( userCursorString.startsWith( "\"" ) ) { - userCursorString = userCursorString.substring( 1 ); - } - if ( userCursorString.endsWith( "\"" ) ) { - userCursorString = userCursorString.substring( 0, userCursorString.length() - 1 ); - } - - //now get the cursor from the map and validate - final String esScrollCursor = mapManager.getString( userCursorString ); - - Preconditions.checkArgument(esScrollCursor != null, "Could not find a cursor for the value '{}' ", esScrollCursor); - - - - logger.debug( "Executing query with cursor: {} ", esScrollCursor ); - - - SearchScrollRequestBuilder ssrb = esProvider.getClient() - .prepareSearchScroll(esScrollCursor).setScroll( cursorTimeout + "m" ); - - try { - //Added For Graphite Metrics - Timer.Context timeSearchCursor = cursorTimer.time(); - searchResponse = ssrb.execute().actionGet(); - timeSearchCursor.stop(); - } - catch ( Throwable t ) { - logger.error( "Unable to communicate with elasticsearch", t ); - failureMonitor.fail( "Unable to execute batch", t ); - throw t; - } - - - failureMonitor.success(); - } - - return parseResults(searchResponse, query); - } - - - private CandidateResults parseResults( final SearchResponse searchResponse, final Query query ) { - - final SearchHits searchHits = searchResponse.getHits(); - final SearchHit[] hits = searchHits.getHits(); - final int length = hits.length; - - logger.debug(" Hit count: {} Total hits: {}", length, searchHits.getTotalHits()); - - List<CandidateResult> candidates = new ArrayList<>( length ); - - for ( SearchHit hit : hits ) { - - String[] idparts = hit.getId().split( SPLITTER ); - String id = idparts[0]; - String type = idparts[1]; - String version = idparts[2]; - - Id entityId = new SimpleId( UUID.fromString( id ), type ); - - candidates.add( new CandidateResult( entityId, UUID.fromString( version ) ) ); - } - - CandidateResults candidateResults = new CandidateResults( query, candidates ); - - if ( candidates.size() >= query.getLimit() ) { - //USERGRID-461 our cursor is getting too large, map it to a new time UUID - //TODO T.N., this shouldn't live here. This should live at the UG core tier. However the RM/EM are an absolute mess, so until they're refactored, this is it's home - - final String userCursorString = org.apache.usergrid.persistence.index.utils.StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() ); - - final String esScrollCursor = searchResponse.getScrollId(); - - //now set this into our map module - final int minutes = indexFig.getQueryCursorTimeout(); - - //just truncate it, we'll never hit a long value anyway - mapManager.putString( userCursorString, esScrollCursor, ( int ) TimeUnit.MINUTES.toSeconds( minutes ) ); - - candidateResults.setCursor( userCursorString ); - logger.debug(" User cursor = {}, Cursor = {} ", userCursorString, esScrollCursor); - } - - return candidateResults; - } public void refresh() { @@ -587,8 +343,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex { ); if ( indexes.length == 0 ) { - logger.debug( "Not refreshing indexes, none found for app {}", - applicationScope.getApplication().getUuid() ); + logger.debug( "Not refreshing indexes. none found"); return true; } //Added For Graphite Metrics @@ -609,70 +364,6 @@ public class EsEntityIndexImpl implements AliasedEntityIndex { } - @Override - public int getPendingTasks() { - - final PendingClusterTasksResponse tasksResponse = esProvider.getClient().admin() - .cluster().pendingClusterTasks( new PendingClusterTasksRequest() ).actionGet(); - - return tasksResponse.pendingTasks().size(); - } - - - - /** - * Completely delete an index. - */ - public ListenableActionFuture deleteIndex() { - //TODO: add timer - //TODO: add all indexes - String idString = IndexingUtils.idString(applicationScope.getApplication()); - - final TermQueryBuilder tqb = QueryBuilders.termQuery(APPLICATION_ID_FIELDNAME, idString); - - //Added For Graphite Metrics - - final ListenableActionFuture<DeleteByQueryResponse> response = esProvider.getClient() - .prepareDeleteByQuery( alias.getWriteAlias() ).setQuery( tqb ).execute(); - - response.addListener(new ActionListener<DeleteByQueryResponse>() { - - @Override - public void onResponse(DeleteByQueryResponse response) { - checkDeleteByQueryResponse(tqb,response); - } - - - @Override - public void onFailure(Throwable e) { - logger.error("failed on delete index",e); - } - }); - return response; - } - /** - * Validate the response doesn't contain errors, if it does, fail fast at the first error we encounter - */ - private void checkDeleteByQueryResponse( - final QueryBuilder query, final DeleteByQueryResponse response ) { - - for ( IndexDeleteByQueryResponse indexDeleteByQueryResponse : response ) { - final ShardOperationFailedException[] failures = indexDeleteByQueryResponse.getFailures(); - - for ( ShardOperationFailedException failedException : failures ) { - logger.error( String.format("Unable to delete by query %s. " - + "Failed with code %d and reason %s on shard %s in index %s", - query.toString(), - failedException.status().getStatus(), - failedException.reason(), - failedException.shardId(), - failedException.index() ) - ); - } - - } - } - /** http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/679812e6/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java index d49d8cc..c74ce93 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java @@ -96,21 +96,6 @@ public class IndexingUtils { return sb.toString(); } - - /** - * Create the facilities to retrieve an index name and alias name - * @param fig - * @param applicationScope - * @return - */ - public static IndexIdentifier createIndexIdentifier(IndexFig fig, ApplicationScope applicationScope) { - return new IndexIdentifier(fig,applicationScope); - } - - - - - /** * Create the index doc from the given entity * @param entity http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/679812e6/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java index c1bfe38..303d481 100644 --- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java +++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; +import org.apache.usergrid.persistence.index.*; import org.junit.ClassRule; import org.junit.Ignore; import org.junit.Test; @@ -40,11 +41,6 @@ import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; -import org.apache.usergrid.persistence.index.EntityIndex; -import org.apache.usergrid.persistence.index.EntityIndexBatch; -import org.apache.usergrid.persistence.index.EntityIndexFactory; -import org.apache.usergrid.persistence.index.IndexScope; -import org.apache.usergrid.persistence.index.SearchTypes; import org.apache.usergrid.persistence.index.guice.TestIndexModule; import org.apache.usergrid.persistence.index.query.CandidateResults; import org.apache.usergrid.persistence.index.query.EntityResults; @@ -83,6 +79,7 @@ public class CorePerformanceIT extends BaseIT { static EntityCollectionManagerFactory ecmf; static EntityIndexFactory ecif ; + static EntityIndex ecf; @Ignore("Relies on finefoods.txt which must be downloaded separately") @@ -97,6 +94,7 @@ public class CorePerformanceIT extends BaseIT { ecmf = injector.getInstance( EntityCollectionManagerFactory.class ); ecif = injector.getInstance( EntityIndexFactory.class ); + ecf = injector.getInstance(EntityIndex.class); final ApplicationScope scope = new ApplicationScopeImpl( new SimpleId( "application" ) ); @@ -174,7 +172,7 @@ public class CorePerformanceIT extends BaseIT { public void run() { - EntityIndex eci = ecif.createEntityIndex( scope); + ApplicationEntityIndex eci = ecif.createApplicationEntityIndex(scope); EntityCollectionManager ecm = ecmf.createCollectionManager( new CollectionScopeImpl( scope.getApplication(), indexScope.getOwner(), indexScope.getName() ) ); Query query = Query.fromQL( "review_score > 0"); // get all reviews; @@ -219,7 +217,8 @@ public class CorePerformanceIT extends BaseIT { CollectionScope collectionScope = new CollectionScopeImpl( applicationScope.getApplication(), indexScope.getOwner(), indexScope.getName() ); EntityCollectionManager ecm = ecmf.createCollectionManager(collectionScope ); - EntityIndex eci = ecif.createEntityIndex(applicationScope ); + ApplicationEntityIndex eci = ecif.createApplicationEntityIndex(applicationScope); + FileReader fr; try { @@ -304,7 +303,7 @@ public class CorePerformanceIT extends BaseIT { throw new RuntimeException("Error reading file", ex); } - eci.refresh(); + ecf.refresh(); } } @@ -312,7 +311,8 @@ public class CorePerformanceIT extends BaseIT { public void runSelectedQueries(final ApplicationScope scope, List<IndexScope> indexScopes ) { for ( IndexScope indexScope : indexScopes ) { - EntityIndex eci = ecif.createEntityIndex(scope ); + ApplicationEntityIndex eci = ecif.createApplicationEntityIndex(scope); + // TODO: come up with more and more complex queries for CorePerformanceIT @@ -330,7 +330,7 @@ public class CorePerformanceIT extends BaseIT { } } - public static void query(final IndexScope indexScope, final EntityIndex eci, final String query ) {; + public static void query(final IndexScope indexScope, final ApplicationEntityIndex eci, final String query ) {; Query q = Query.fromQL(query) ; // CandidateResults candidateResults = eci.search(indexScope, q ); TODO FIXME // log.info("size = {} returned from query {}", candidateResults.size(), q.getQl() ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/679812e6/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java index a399809..8e8d6c0 100644 --- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java +++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java @@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.index.impl; import java.io.IOException; import java.util.HashMap; +import org.apache.usergrid.persistence.index.*; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; @@ -30,11 +31,6 @@ import org.apache.usergrid.persistence.collection.util.EntityUtils; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; import org.apache.usergrid.persistence.core.test.UseModules; -import org.apache.usergrid.persistence.index.EntityIndex; -import org.apache.usergrid.persistence.index.EntityIndexBatch; -import org.apache.usergrid.persistence.index.EntityIndexFactory; -import org.apache.usergrid.persistence.index.IndexScope; -import org.apache.usergrid.persistence.index.SearchTypes; import org.apache.usergrid.persistence.index.exceptions.QueryParseException; import org.apache.usergrid.persistence.index.guice.TestIndexModule; import org.apache.usergrid.persistence.index.query.CandidateResult; @@ -65,6 +61,8 @@ public class EntityConnectionIndexImplTest extends BaseIT { @Inject public EntityIndexFactory ecif; + @Inject + public EntityIndex ei; @Test public void testBasicOperation() throws IOException, InterruptedException { @@ -117,8 +115,7 @@ public class EntityConnectionIndexImplTest extends BaseIT { IndexScope otherIndexScope = new IndexScopeImpl( new SimpleId( UUIDGenerator.newTimeUUID(), "animal" ), "likes" ); - EntityIndex personLikesIndex = ecif.createEntityIndex( applicationScope ); - personLikesIndex.initializeIndex(); + ApplicationEntityIndex personLikesIndex = ecif.createApplicationEntityIndex(applicationScope); EntityIndexBatch batch = personLikesIndex.createBatch(); @@ -137,7 +134,7 @@ public class EntityConnectionIndexImplTest extends BaseIT { batch.index( otherIndexScope, oj ); batch.execute().get(); - personLikesIndex.refresh(); + ei.refresh(); Thread.sleep( 2000 ); @@ -247,8 +244,7 @@ public class EntityConnectionIndexImplTest extends BaseIT { IndexScope otherIndexScope = new IndexScopeImpl( new SimpleId( UUIDGenerator.newTimeUUID(), "animal" ), "likes" ); - EntityIndex personLikesIndex = ecif.createEntityIndex( applicationScope ); - personLikesIndex.initializeIndex(); + ApplicationEntityIndex personLikesIndex = ecif.createApplicationEntityIndex(applicationScope); EntityIndexBatch batch = personLikesIndex.createBatch(); @@ -267,7 +263,7 @@ public class EntityConnectionIndexImplTest extends BaseIT { batch.index( otherIndexScope, oj ); batch.execute().get(); - personLikesIndex.refresh(); + ei.refresh(); // now, let's search for muffins @@ -285,7 +281,7 @@ public class EntityConnectionIndexImplTest extends BaseIT { batch.deindex( searchScope, muffin ); batch.deindex( searchScope, oj ); batch.execute().get(); - personLikesIndex.refresh(); + ei.refresh(); likes = personLikesIndex.search( searchScope, SearchTypes.fromTypes( muffin.getId().getType(), egg.getId().getType(), oj.getId().getType() ), http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/679812e6/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java index 047fbe8..483efb9 100644 --- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java +++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java @@ -70,6 +70,8 @@ public class EntityIndexTest extends BaseIT { @Inject public EntityIndexFactory eif; + @Inject + public EntityIndex ei; //TODO T.N. Remove this when we move the cursor mapping back to core @Inject @@ -82,9 +84,8 @@ public class EntityIndexTest extends BaseIT { Id appId = new SimpleId( "application" ); ApplicationScope applicationScope = new ApplicationScopeImpl( appId ); + ApplicationEntityIndex entityIndex = eif.createApplicationEntityIndex(applicationScope); - EntityIndex entityIndex = eif.createEntityIndex( applicationScope ); - entityIndex.initializeIndex(); final String entityType = "thing"; IndexScope indexScope = new IndexScopeImpl( appId, "things" ); @@ -92,7 +93,7 @@ public class EntityIndexTest extends BaseIT { insertJsonBlob(entityIndex, entityType, indexScope, "/sample-large.json",101,0); - entityIndex.refresh(); + ei.refresh(); testQueries( indexScope, searchTypes, entityIndex ); @@ -105,8 +106,7 @@ public class EntityIndexTest extends BaseIT { ApplicationScope applicationScope = new ApplicationScopeImpl( appId ); - EntityIndex entityIndex = eif.createEntityIndex( applicationScope ); - entityIndex.initializeIndex(); + ApplicationEntityIndex entityIndex = eif.createApplicationEntityIndex(applicationScope); final String entityType = "thing"; IndexScope indexScope = new IndexScopeImpl( appId, "things" ); @@ -133,7 +133,7 @@ public class EntityIndexTest extends BaseIT { batch.index(indexScope, entity); batch.execute().get(); - entityIndex.refresh(); + ei.refresh(); testQueries( indexScope, searchTypes, entityIndex ); } @@ -147,10 +147,10 @@ public class EntityIndexTest extends BaseIT { long now = System.currentTimeMillis(); final int threads = 20; final int size = 30; - final EntityIndex entityIndex = eif.createEntityIndex( applicationScope ); + final ApplicationEntityIndex entityIndex = eif.createApplicationEntityIndex(applicationScope); final IndexScope indexScope = new IndexScopeImpl(appId, "things"); final String entityType = "thing"; - entityIndex.initializeIndex(); + ei.initializeIndex(); final CountDownLatch latch = new CountDownLatch(threads); final AtomicLong failTime=new AtomicLong(0); InputStream is = this.getClass().getResourceAsStream( "/sample-large.json" ); @@ -192,10 +192,10 @@ public class EntityIndexTest extends BaseIT { ApplicationScope applicationScope = new ApplicationScopeImpl( appId ); - EntityIndex entityIndex = eif.createEntityIndex( applicationScope ); - entityIndex.initializeIndex(); + ApplicationEntityIndex entityIndex = eif.createApplicationEntityIndex(applicationScope); + ei.initializeIndex(); for(int i=0;i<10;i++) { - entityIndex.initializeIndex(); + ei.initializeIndex(); } } @@ -206,8 +206,8 @@ public class EntityIndexTest extends BaseIT { ApplicationScope applicationScope = new ApplicationScopeImpl( appId ); - AliasedEntityIndex entityIndex =(AliasedEntityIndex) eif.createEntityIndex( applicationScope ); - entityIndex.initializeIndex(); + ApplicationEntityIndex entityIndex = eif.createApplicationEntityIndex(applicationScope); + ei.initializeIndex(); final String entityType = "thing"; IndexScope indexScope = new IndexScopeImpl( appId, "things" ); @@ -215,15 +215,15 @@ public class EntityIndexTest extends BaseIT { insertJsonBlob(entityIndex, entityType, indexScope, "/sample-large.json",101,0); - entityIndex.refresh(); + ei.refresh(); testQueries( indexScope, searchTypes, entityIndex ); - entityIndex.addIndex("v2", 1,0,"one"); + ei.addIndex("v2", 1,0,"one"); insertJsonBlob(entityIndex, entityType, indexScope, "/sample-large.json",101,100); - entityIndex.refresh(); + ei.refresh(); //Hilda Youn testQuery(indexScope, searchTypes, entityIndex, "name = 'Hilda Young'", 1 ); @@ -237,9 +237,8 @@ public class EntityIndexTest extends BaseIT { ApplicationScope applicationScope = new ApplicationScopeImpl( appId ); - AliasedEntityIndex entityIndex =(AliasedEntityIndex) eif.createEntityIndex( applicationScope ); + ApplicationEntityIndex entityIndex = eif.createApplicationEntityIndex( applicationScope ); - entityIndex.initializeIndex(); final String entityType = "thing"; IndexScope indexScope = new IndexScopeImpl( appId, "things" ); @@ -247,33 +246,33 @@ public class EntityIndexTest extends BaseIT { insertJsonBlob(entityIndex, entityType, indexScope, "/sample-large.json",1,0); - entityIndex.refresh(); + ei.refresh(); - entityIndex.addIndex("v2", 1, 0, "one"); + ei.addIndex("v2", 1, 0, "one"); insertJsonBlob(entityIndex, entityType, indexScope, "/sample-large.json", 1, 0); - entityIndex.refresh(); + ei.refresh(); CandidateResults crs = testQuery(indexScope, searchTypes, entityIndex, "name = 'Bowers Oneil'", 2); EntityIndexBatch entityIndexBatch = entityIndex.createBatch(); entityIndexBatch.deindex(indexScope, crs.get(0)); entityIndexBatch.deindex(indexScope, crs.get(1)); entityIndexBatch.execute().get(); - entityIndex.refresh(); + ei.refresh(); //Hilda Youn testQuery(indexScope, searchTypes, entityIndex, "name = 'Bowers Oneil'", 0); } - private void insertJsonBlob(EntityIndex entityIndex, String entityType, IndexScope indexScope, String filePath,final int max,final int startIndex) throws IOException { + private void insertJsonBlob(ApplicationEntityIndex entityIndex, String entityType, IndexScope indexScope, String filePath,final int max,final int startIndex) throws IOException { InputStream is = this.getClass().getResourceAsStream( filePath ); ObjectMapper mapper = new ObjectMapper(); List<Object> sampleJson = mapper.readValue( is, new TypeReference<List<Object>>() {} ); EntityIndexBatch batch = entityIndex.createBatch(); insertJsonBlob(sampleJson,batch, entityType, indexScope, max, startIndex); batch.execute().get(); - entityIndex.refresh(); + ei.refresh(); } private void insertJsonBlob(List<Object> sampleJson, EntityIndexBatch batch, String entityType, IndexScope indexScope,final int max,final int startIndex) throws IOException { @@ -320,8 +319,8 @@ public class EntityIndexTest extends BaseIT { IndexScope indexScope = new IndexScopeImpl( appId, "fastcars" ); - EntityIndex entityIndex = eif.createEntityIndex( applicationScope ); - entityIndex.initializeIndex(); + ApplicationEntityIndex entityIndex = eif.createApplicationEntityIndex(applicationScope); + ei.initializeIndex(); Map entityMap = new HashMap() {{ put( "name", "Ferrari 212 Inter" ); @@ -336,7 +335,7 @@ public class EntityIndexTest extends BaseIT { entity.setField(new UUIDField(IndexingUtils.ENTITYID_ID_FIELDNAME, UUID.randomUUID())); entityIndex.createBatch().index(indexScope , entity ).execute().get(); - entityIndex.refresh(); + ei.refresh(); CandidateResults candidateResults = entityIndex.search( indexScope, SearchTypes.fromTypes( entity.getId().getType() ), Query.fromQL( "name contains 'Ferrari*'" ) ); @@ -345,14 +344,14 @@ public class EntityIndexTest extends BaseIT { EntityIndexBatch batch = entityIndex.createBatch(); batch.deindex(indexScope, entity).execute().get(); batch.execute().get(); - entityIndex.refresh(); + ei.refresh(); candidateResults = entityIndex.search( indexScope, SearchTypes.fromTypes(entity.getId().getType()), Query.fromQL( "name contains 'Ferrari*'" ) ); assertEquals( 0, candidateResults.size() ); } - private CandidateResults testQuery(final IndexScope scope, final SearchTypes searchTypes, final EntityIndex entityIndex, final String queryString, final int num ) { + private CandidateResults testQuery(final IndexScope scope, final SearchTypes searchTypes, final ApplicationEntityIndex entityIndex, final String queryString, final int num ) { StopWatch timer = new StopWatch(); timer.start(); @@ -367,7 +366,7 @@ public class EntityIndexTest extends BaseIT { } - private void testQueries(final IndexScope scope, SearchTypes searchTypes, final EntityIndex entityIndex ) { + private void testQueries(final IndexScope scope, SearchTypes searchTypes, final ApplicationEntityIndex entityIndex ) { testQuery(scope, searchTypes, entityIndex, "name = 'Morgan Pierce'", 1 ); @@ -451,7 +450,7 @@ public class EntityIndexTest extends BaseIT { IndexScope appScope = new IndexScopeImpl( ownerId, "user" ); - EntityIndex ei = eif.createEntityIndex( applicationScope ); + ApplicationEntityIndex entityIndex = eif.createApplicationEntityIndex(applicationScope); ei.initializeIndex(); final String middleName = "middleName" + UUIDUtils.newTimeUUID(); @@ -467,7 +466,7 @@ public class EntityIndexTest extends BaseIT { EntityUtils.setVersion( user, UUIDGenerator.newTimeUUID() ); - EntityIndexBatch batch = ei.createBatch(); + EntityIndexBatch batch = entityIndex.createBatch(); batch.index( appScope, user); batch.execute().get(); @@ -475,7 +474,7 @@ public class EntityIndexTest extends BaseIT { Query query = new Query(); query.addEqualityFilter( "username", "edanuff" ); - CandidateResults r = ei.search( appScope, SearchTypes.fromTypes( "edanuff" ), query ); + CandidateResults r = entityIndex.search( appScope, SearchTypes.fromTypes( "edanuff" ), query ); assertEquals( user.getId(), r.get( 0 ).getId() ); batch.deindex(appScope, user.getId(), user.getVersion() ); @@ -486,7 +485,7 @@ public class EntityIndexTest extends BaseIT { query = new Query(); query.addEqualityFilter( "username", "edanuff" ); - r = ei.search(appScope,SearchTypes.fromTypes( "edanuff" ), query ); + r = entityIndex.search(appScope,SearchTypes.fromTypes( "edanuff" ), query ); assertFalse( r.iterator().hasNext() ); } @@ -500,9 +499,9 @@ public class EntityIndexTest extends BaseIT { IndexScope appScope = new IndexScopeImpl( ownerId, "user" ); - EntityIndex ei = eif.createEntityIndex( applicationScope ); + ApplicationEntityIndex entityIndex = eif.createApplicationEntityIndex(applicationScope); ei.initializeIndex(); - ei.createBatch(); + entityIndex.createBatch(); // Bill has favorites as string, age as string and retirement goal as number Map billMap = new HashMap() {{ @@ -516,7 +515,7 @@ public class EntityIndexTest extends BaseIT { EntityUtils.setId( bill, new SimpleId( UUIDGenerator.newTimeUUID(), "user" ) ); EntityUtils.setVersion( bill, UUIDGenerator.newTimeUUID() ); - EntityIndexBatch batch = ei.createBatch(); + EntityIndexBatch batch = entityIndex.createBatch(); batch.index( appScope, bill ); @@ -547,22 +546,22 @@ public class EntityIndexTest extends BaseIT { Query query = new Query(); query.addEqualityFilter( "username", "bill" ); - CandidateResults r = ei.search( appScope, searchTypes, query ); + CandidateResults r = entityIndex.search( appScope, searchTypes, query ); assertEquals( bill.getId(), r.get( 0 ).getId() ); query = new Query(); query.addEqualityFilter( "username", "fred" ); - r = ei.search( appScope, searchTypes, query ); + r = entityIndex.search( appScope, searchTypes, query ); assertEquals( fred.getId(), r.get( 0 ).getId() ); query = new Query(); query.addEqualityFilter( "age", 41 ); - r = ei.search( appScope, searchTypes, query ); + r = entityIndex.search( appScope, searchTypes, query ); assertEquals( fred.getId(), r.get( 0 ).getId() ); query = new Query(); query.addEqualityFilter( "age", "thirtysomething" ); - r = ei.search( appScope, searchTypes, query ); + r = entityIndex.search( appScope, searchTypes, query ); assertEquals( bill.getId(), r.get( 0 ).getId() ); } @@ -570,11 +569,6 @@ public class EntityIndexTest extends BaseIT { @Test public void healthTest() { - Id appId = new SimpleId( "application" ); - ApplicationScope applicationScope = new ApplicationScopeImpl( appId ); - - EntityIndex ei = eif.createEntityIndex( applicationScope ); - assertNotEquals( "cluster should be ok", Health.RED, ei.getClusterHealth() ); assertEquals( "index should be ready", Health.GREEN, ei.getIndexHealth() ); @@ -597,8 +591,8 @@ public class EntityIndexTest extends BaseIT { IndexScope indexScope = new IndexScopeImpl( ownerId, "users" ); - EntityIndex entityIndex = eif.createEntityIndex( applicationScope ); - entityIndex.initializeIndex(); + ApplicationEntityIndex entityIndex = eif.createApplicationEntityIndex(applicationScope); + ei.initializeIndex(); final EntityIndexBatch batch = entityIndex.createBatch(); @@ -634,7 +628,7 @@ public class EntityIndexTest extends BaseIT { batch.execute().get(); - entityIndex.refresh(); + ei.refresh(); final int limit = 1;
