Updated queue impl so that we now send an async signal to index
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/745f3557 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/745f3557 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/745f3557 Branch: refs/heads/two-dot-o-dev Commit: 745f3557843eff4a0006fa59913f7a4dbb592dff Parents: 6f87fff Author: Todd Nine <[email protected]> Authored: Fri Apr 17 15:13:24 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Fri Apr 17 15:13:24 2015 -0600 ---------------------------------------------------------------------- .../usergrid/corepersistence/CoreModule.java | 20 +- .../corepersistence/CpEntityManager.java | 97 ++---- .../corepersistence/CpEntityManagerFactory.java | 5 +- .../corepersistence/CpRelationManager.java | 8 +- .../index/AsyncIndexProvider.java | 114 +++++++ .../index/AsyncIndexService.java | 47 +++ .../corepersistence/index/BufferQueue.java | 68 ----- .../index/BufferQueueInMemoryImpl.java | 116 ------- .../index/BufferQueueSQSImpl.java | 306 ------------------- .../index/InMemoryAsyncIndexService.java | 75 +++++ .../corepersistence/index/IndexEntityEvent.java | 75 +++++ .../index/IndexQueueService.java | 45 --- .../corepersistence/index/QueueProvider.java | 112 ------- .../index/SQSAsyncIndexService.java | 263 ++++++++++++++++ .../index/SQSIndexQueueServiceImpl.java | 35 --- .../index/BufferQueueSQSImplTest.java | 179 ----------- .../index/SQSAsyncIndexServiceTest.java | 179 +++++++++++ 17 files changed, 802 insertions(+), 942 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/745f3557/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java index 7e8af87..4758456 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java @@ -16,20 +16,17 @@ package org.apache.usergrid.corepersistence; -import org.apache.usergrid.corepersistence.index.BufferQueue; -import org.apache.usergrid.corepersistence.index.IndexService; -import org.apache.usergrid.corepersistence.index.IndexServiceImpl; -import org.apache.usergrid.corepersistence.index.QueryFig; -import org.apache.usergrid.corepersistence.index.QueueProvider; -import org.apache.usergrid.corepersistence.migration.*; -import org.apache.usergrid.persistence.PersistenceModule; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.safehaus.guicyfig.GuicyFigModule; -import org.springframework.context.ApplicationContext; import org.apache.usergrid.corepersistence.events.EntityDeletedHandler; import org.apache.usergrid.corepersistence.events.EntityVersionCreatedHandler; import org.apache.usergrid.corepersistence.events.EntityVersionDeletedHandler; +import org.apache.usergrid.corepersistence.index.AsyncIndexProvider; +import org.apache.usergrid.corepersistence.index.AsyncIndexService; +import org.apache.usergrid.corepersistence.index.IndexService; +import org.apache.usergrid.corepersistence.index.IndexServiceImpl; +import org.apache.usergrid.corepersistence.index.QueryFig; +import org.apache.usergrid.corepersistence.migration.AppInfoMigrationPlugin; import org.apache.usergrid.corepersistence.migration.CoreMigration; import org.apache.usergrid.corepersistence.migration.CoreMigrationPlugin; import org.apache.usergrid.corepersistence.migration.EntityTypeMappingMigration; @@ -37,7 +34,6 @@ import org.apache.usergrid.corepersistence.migration.MigrationModuleVersionPlugi import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservableImpl; import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemImpl; import org.apache.usergrid.corepersistence.rx.impl.AllNodesInGraphImpl; -import org.apache.usergrid.persistence.EntityManagerFactory; import org.apache.usergrid.persistence.collection.event.EntityDeleted; import org.apache.usergrid.persistence.collection.event.EntityVersionCreated; import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted; @@ -47,12 +43,12 @@ import org.apache.usergrid.persistence.core.guice.CommonModule; import org.apache.usergrid.persistence.core.migration.data.DataMigration; import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider; import org.apache.usergrid.persistence.core.migration.data.MigrationPlugin; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.guice.GraphModule; import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphNode; import org.apache.usergrid.persistence.index.guice.IndexModule; import com.google.inject.AbstractModule; -import com.google.inject.Provider; import com.google.inject.TypeLiteral; import com.google.inject.multibindings.Multibinder; @@ -151,7 +147,7 @@ public class CoreModule extends AbstractModule { bind(IndexService.class).to(IndexServiceImpl.class); //bind the queue provider - bind( BufferQueue.class).toProvider( QueueProvider.class ); + bind( AsyncIndexService.class).toProvider( AsyncIndexProvider.class ); install( new GuicyFigModule( QueryFig.class ) ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/745f3557/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 1506551..844892a 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -37,6 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; +import org.apache.usergrid.corepersistence.index.AsyncIndexService; import org.apache.usergrid.corepersistence.index.IndexService; import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; import org.apache.usergrid.corepersistence.util.CpNamingUtils; @@ -178,7 +179,7 @@ public class CpEntityManager implements EntityManager { private final CounterUtils counterUtils; - private final IndexService indexService; + private final AsyncIndexService indexService; private boolean skipAggregateCounters; private MetricsFactory metricsFactory; @@ -218,7 +219,7 @@ public class CpEntityManager implements EntityManager { * @param metricsFactory * @param applicationId */ - public CpEntityManager(final CassandraService cass, final CounterUtils counterUtils, final IndexService indexService, final ManagerCache managerCache, final MetricsFactory metricsFactory, final UUID applicationId ) { + public CpEntityManager(final CassandraService cass, final CounterUtils counterUtils, final AsyncIndexService indexService, final ManagerCache managerCache, final MetricsFactory metricsFactory, final UUID applicationId ) { Preconditions.checkNotNull( cass, "cass must not be null" ); Preconditions.checkNotNull( counterUtils, "counterUtils must not be null" ); @@ -722,7 +723,7 @@ public class CpEntityManager implements EntityManager { @Override public void updateApplication( Application app ) throws Exception { - update(app); + update( app ); this.application = app; } @@ -903,7 +904,7 @@ public class CpEntityManager implements EntityManager { } - return Collections.<EntityRef>singleton(new SimpleEntityRef(id.getType(), id.getUuid())); + return Collections.<EntityRef>singleton( new SimpleEntityRef( id.getType(), id.getUuid() ) ); } @@ -970,7 +971,7 @@ public class CpEntityManager implements EntityManager { public void setProperty( EntityRef entityRef, String propertyName, Object propertyValue ) throws Exception { - setProperty(entityRef, propertyName, propertyValue, false); + setProperty( entityRef, propertyName, propertyValue, false ); } @@ -988,7 +989,7 @@ public class CpEntityManager implements EntityManager { entity.getType(), propertyName, propertyValue ); entity.setProperty( propertyName, propertyValue ); - entity.setProperty(PROPERTY_MODIFIED, UUIDUtils.getTimestampInMillis(UUIDUtils.newTimeUUID())); + entity.setProperty( PROPERTY_MODIFIED, UUIDUtils.getTimestampInMillis( UUIDUtils.newTimeUUID() ) ); update( entity ); } @@ -1083,7 +1084,7 @@ public class CpEntityManager implements EntityManager { public void addToDictionary( EntityRef entityRef, String dictionaryName, Object elementValue ) throws Exception { - addToDictionary(entityRef, dictionaryName, elementValue, null); + addToDictionary( entityRef, dictionaryName, elementValue, null ); } @@ -1144,7 +1145,7 @@ public class CpEntityManager implements EntityManager { EntityRef entity = get( entityRef ); UUID timestampUuid = UUIDUtils.newTimeUUID(); - Mutator<ByteBuffer> batch = createMutator(cass.getApplicationKeyspace(applicationId), be); + Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be ); for ( Map.Entry<?, ?> elementValue : elementValues.entrySet() ) { batch = batchUpdateDictionary( batch, entity, dictionaryName, elementValue.getKey(), @@ -1241,7 +1242,7 @@ public class CpEntityManager implements EntityManager { } Class<?> dictionaryCoType = - Schema.getDefaultSchema().getDictionaryValueType(entity.getType(), dictionaryName); + Schema.getDefaultSchema().getDictionaryValueType( entity.getType(), dictionaryName ); boolean coTypeIsBasic = ClassUtils.isBasicType( dictionaryCoType ); HColumn<ByteBuffer, ByteBuffer> result = @@ -1259,7 +1260,7 @@ public class CpEntityManager implements EntityManager { } } else { - logger.info("Results of CpEntityManagerImpl.getDictionaryElementValue is null"); + logger.info( "Results of CpEntityManagerImpl.getDictionaryElementValue is null" ); } return value; @@ -1283,7 +1284,7 @@ public class CpEntityManager implements EntityManager { } Class<?> dictionaryCoType = - Schema.getDefaultSchema().getDictionaryValueType(entity.getType(), dictionaryName); + Schema.getDefaultSchema().getDictionaryValueType( entity.getType(), dictionaryName ); boolean coTypeIsBasic = ClassUtils.isBasicType( dictionaryCoType ); ByteBuffer[] columnNames = new ByteBuffer[elementNames.length]; @@ -1340,7 +1341,7 @@ public class CpEntityManager implements EntityManager { @Override public Set<String> getDictionaries( EntityRef entity ) throws Exception { - return getDictionaryNames(entity); + return getDictionaryNames( entity ); } @@ -1386,7 +1387,7 @@ public class CpEntityManager implements EntityManager { throws Exception { return getRelationManager( get( entityId )) - .getCollection(collectionName, query, resultsLevel); + .getCollection( collectionName, query, resultsLevel ); } @@ -1496,7 +1497,7 @@ public class CpEntityManager implements EntityManager { EntityRef pairedEntity, String connectionType, EntityRef connectedEntityRef ) throws Exception { return getRelationManager( connectingEntity ) - .connectionRef(pairedConnectionType, pairedEntity, connectionType, connectedEntityRef); + .connectionRef( pairedConnectionType, pairedEntity, connectionType, connectedEntityRef ); } @@ -1785,14 +1786,14 @@ public class CpEntityManager implements EntityManager { String roleTitle = batchRoleName; String propertyName = groupId + ":" + batchRoleName; Map<String, Object> properties = new TreeMap<String, Object>( CASE_INSENSITIVE_ORDER ); - properties.put("group", groupId); + properties.put( "group", groupId ); - Entity entity = batchCreateRole(roleName, roleTitle, inactivity, propertyName, groupId, properties); + Entity entity = batchCreateRole( roleName, roleTitle, inactivity, propertyName, groupId, properties ); getRelationManager( new SimpleEntityRef( Group.ENTITY_TYPE, groupId ) ) - .addToCollection(COLLECTION_ROLES, entity); + .addToCollection( COLLECTION_ROLES, entity ); - logger.info("Created role {} with id {} in group {}", - new String[]{roleName, entity.getUuid().toString(), groupId.toString()}); + logger.info( "Created role {} with id {} in group {}", + new String[] { roleName, entity.getUuid().toString(), groupId.toString() } ); return entity; } @@ -1883,7 +1884,7 @@ public class CpEntityManager implements EntityManager { @Override public void grantUserPermission( UUID userId, String permission ) throws Exception { permission = permission.toLowerCase(); - addToDictionary(userRef(userId), DICTIONARY_PERMISSIONS, permission); + addToDictionary( userRef( userId ), DICTIONARY_PERMISSIONS, permission ); } @@ -1907,9 +1908,9 @@ public class CpEntityManager implements EntityManager { roleName = roleName.toLowerCase(); EntityRef userRef = userRef( userId ); EntityRef roleRef = getRoleRef( roleName ); - addToDictionary(userRef, DICTIONARY_ROLENAMES, roleName, roleName); - addToCollection(userRef, COLLECTION_ROLES, roleRef); - addToCollection(roleRef, COLLECTION_USERS, userRef); + addToDictionary( userRef, DICTIONARY_ROLENAMES, roleName, roleName ); + addToCollection( userRef, COLLECTION_ROLES, roleRef ); + addToCollection( roleRef, COLLECTION_USERS, userRef ); } @@ -1945,7 +1946,7 @@ public class CpEntityManager implements EntityManager { private EntityRef getRoleRef( String roleName ) throws Exception { Results results = this.searchCollection( new SimpleEntityRef( Application.ENTITY_TYPE, applicationId ), - Schema.defaultCollectionName( Role.ENTITY_TYPE ), Query.fromQL( "roleName = '" + roleName + "'" )); + Schema.defaultCollectionName( Role.ENTITY_TYPE ), Query.fromQL( "roleName = '" + roleName + "'" ) ); Iterator<Entity> iterator = results.iterator(); EntityRef roleRef = null; while ( iterator.hasNext() ) { @@ -2191,8 +2192,8 @@ public class CpEntityManager implements EntityManager { @Override public Set<String> getCounterNames() throws Exception { Set<String> names = new TreeSet<String>( CASE_INSENSITIVE_ORDER ); - Set<String> nameSet = cast(getDictionaryAsSet(getApplicationRef(), Schema.DICTIONARY_COUNTERS)); - names.addAll(nameSet); + Set<String> nameSet = cast( getDictionaryAsSet( getApplicationRef(), Schema.DICTIONARY_COUNTERS ) ); + names.addAll( nameSet ); return names; } @@ -2264,7 +2265,7 @@ public class CpEntityManager implements EntityManager { public Entity get( UUID uuid ) throws Exception { MapManager mm = getMapManagerForTypes(); - String entityType = mm.getString(uuid.toString()); + String entityType = mm.getString( uuid.toString() ); final Entity entity; @@ -2288,7 +2289,7 @@ public class CpEntityManager implements EntityManager { final MapScope ms = CpNamingUtils.getEntityTypeMapScope( mapOwner ); - MapManager mm = managerCache.getMapManager(ms); + MapManager mm = managerCache.getMapManager( ms ); return mm; } @@ -2391,7 +2392,7 @@ public class CpEntityManager implements EntityManager { @Override public Map<String, Role> getUserRolesWithTitles( UUID userId ) throws Exception { return getRolesWithTitles( - ( Set<String> ) cast( getDictionaryAsSet( userRef(userId), DICTIONARY_ROLENAMES ) ) ); + ( Set<String> ) cast( getDictionaryAsSet( userRef( userId ), DICTIONARY_ROLENAMES ) ) ); } @@ -2406,8 +2407,8 @@ public class CpEntityManager implements EntityManager { @Override public void addGroupToRole( UUID groupId, String roleName ) throws Exception { roleName = roleName.toLowerCase(); - addToDictionary(groupRef(groupId), DICTIONARY_ROLENAMES, roleName, roleName); - addToCollection(groupRef(groupId), COLLECTION_ROLES, getRoleRef(roleName)); + addToDictionary( groupRef( groupId ), DICTIONARY_ROLENAMES, roleName, roleName ); + addToCollection( groupRef( groupId ), COLLECTION_ROLES, getRoleRef( roleName ) ); } @@ -2428,7 +2429,7 @@ public class CpEntityManager implements EntityManager { @Override public void grantGroupPermission( UUID groupId, String permission ) throws Exception { permission = permission.toLowerCase(); - addToDictionary(groupRef(groupId), DICTIONARY_PERMISSIONS, permission); + addToDictionary( groupRef( groupId ), DICTIONARY_PERMISSIONS, permission ); } @@ -2891,38 +2892,6 @@ public class CpEntityManager implements EntityManager { } ); } - - void indexEntityIntoCollection( final Edge edge, final org.apache.usergrid.persistence.model.entity.Entity collectionMember) { - - throw new UnsupportedOperationException( "Use the new interface!" ); -// -// final ApplicationEntityIndex aie = getManagerCache().getEntityIndex( getApplicationScope() ); -// final EntityIndexBatch batch = aie.createBatch(); -// -// // index member into entity collection | type scope -// final IndexEdge collectionIndexScope = generateScopeFromSource( edge ); -// batch.index( collectionIndexScope, collectionMember ); -// -// //TODO REMOVE INDEX CODE -// // // index member into entity | all-types scope -// // IndexScope entityAllTypesScope = new IndexScopeImpl( -// // collectionEntity.getId(), -// // CpNamingUtils.ALL_TYPES, entityType ); -// // -// // batch.index(entityAllTypesScope, memberEntity); -// // -// // // index member into application | all-types scope -// // IndexScope appAllTypesScope = new IndexScopeImpl( -// // getApplicationScope().getApplication(), -// // CpNamingUtils.ALL_TYPES, entityType ); -// // -// // batch.index(appAllTypesScope, memberEntity); -// -// //Adding graphite metrics -// Timer.Context timeIndexEntityCollection = esIndexEntityCollectionTimer.time(); -// batch.execute(); -// timeIndexEntityCollection.stop(); - } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/745f3557/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index 6ad8616..da1febc 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -40,6 +40,7 @@ import org.springframework.context.ApplicationContextAware; import org.apache.commons.lang.StringUtils; +import org.apache.usergrid.corepersistence.index.AsyncIndexService; import org.apache.usergrid.corepersistence.index.IndexService; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.exception.ConflictException; @@ -111,7 +112,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application private Injector injector; private final EntityIndex entityIndex; private final MetricsFactory metricsFactory; - private final IndexService indexService; + private final AsyncIndexService indexService; public CpEntityManagerFactory( final CassandraService cassandraService, final CounterUtils counterUtils, final Injector injector) { @@ -123,7 +124,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application this.entityIndexFactory = injector.getInstance(EntityIndexFactory.class); this.managerCache = injector.getInstance( ManagerCache.class ); this.metricsFactory = injector.getInstance( MetricsFactory.class ); - this.indexService = injector.getInstance( IndexService.class ); + this.indexService = injector.getInstance( AsyncIndexService.class ); this.applicationIdCache = injector.getInstance(ApplicationIdCacheFactory.class).getInstance( getManagementEntityManager() ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/745f3557/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index a0a44b3..db9816e 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -30,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; +import org.apache.usergrid.corepersistence.index.AsyncIndexService; import org.apache.usergrid.corepersistence.index.IndexService; import org.apache.usergrid.corepersistence.results.CollectionResultsLoaderFactoryImpl; import org.apache.usergrid.corepersistence.results.ConnectionResultsLoaderFactoryImpl; @@ -120,13 +121,13 @@ public class CpRelationManager implements RelationManager { private final ApplicationScope applicationScope; - private final IndexService indexService; + private final AsyncIndexService indexService; private MetricsFactory metricsFactory; private Timer updateCollectionTimer; - public CpRelationManager( final MetricsFactory metricsFactory, final ManagerCache managerCache, final IndexService indexService, final EntityManager em, final UUID applicationId, final EntityRef headEntity) { + public CpRelationManager( final MetricsFactory metricsFactory, final ManagerCache managerCache, final AsyncIndexService indexService, final EntityManager em, final UUID applicationId, final EntityRef headEntity) { Assert.notNull( em, "Entity manager cannot be null" ); @@ -481,7 +482,8 @@ public class CpRelationManager implements RelationManager { logger.debug( "Wrote edge {}", edge ); } - ( ( CpEntityManager ) em ).indexEntityIntoCollection( edge, memberEntity ); + indexService.queueEntityIndexUpdate( applicationScope, memberEntity.getId(), memberEntity.getVersion() ); + if ( logger.isDebugEnabled() ) { logger.debug( "Added entity {}:{} to collection {}", new Object[] { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/745f3557/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java new file mode 100644 index 0000000..8257c94 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.queue.QueueManagerFactory; + +import com.google.inject.Inject; +import com.google.inject.Provider; +import com.google.inject.Singleton; + + +/** + * A provider to allow users to configure their queue impl via properties + */ +@Singleton +public class AsyncIndexProvider implements Provider<AsyncIndexService> { + + private final QueryFig queryFig; + + private final EntityCollectionManagerFactory entityCollectionManagerFactory; + private final QueueManagerFactory queueManagerFactory; + private final MetricsFactory metricsFactory; + private final IndexService indexService; + + private AsyncIndexService asyncIndexService; + + + @Inject + public AsyncIndexProvider( final QueryFig queryFig, + final EntityCollectionManagerFactory entityCollectionManagerFactory, + final QueueManagerFactory queueManagerFactory, final MetricsFactory metricsFactory, + final IndexService indexService ) { + this.queryFig = queryFig; + this.entityCollectionManagerFactory = entityCollectionManagerFactory; + this.queueManagerFactory = queueManagerFactory; + this.metricsFactory = metricsFactory; + this.indexService = indexService; + } + + + @Override + @Singleton + public AsyncIndexService get() { + if ( asyncIndexService == null ) { + asyncIndexService = getIndexService(); + } + + + return asyncIndexService; + } + + + private AsyncIndexService getIndexService() { + final String value = queryFig.getQueueImplementation(); + + final Implementations impl = Implementations.valueOf( value ); + + switch ( impl ) { + case LOCAL: + return new InMemoryAsyncIndexService( indexService, entityCollectionManagerFactory ); + case SQS: + return new SQSAsyncIndexService( queueManagerFactory, queryFig, metricsFactory ); + default: + throw new IllegalArgumentException( "Configuration value of " + getErrorValues() + " are allowed" ); + } + } + + + private String getErrorValues() { + String values = ""; + + for ( final Implementations impl : Implementations.values() ) { + values += impl + ", "; + } + + values = values.substring( 0, values.length() - 2 ); + + return values; + } + + + /** + * Different implementations + */ + public static enum Implementations { + LOCAL, + SQS; + + + public String asString() { + return toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/745f3557/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java new file mode 100644 index 0000000..d1f2fb6 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import java.util.UUID; + +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + +import rx.Observable; + + +/** + * Low level queue service for indexing entities + */ +public interface AsyncIndexService { + + + /** + * Queue an entity to be indexed. This will start processing immediately. For implementations that are realtime (akka, in memory) + * We will return a distributed future. For SQS impls, this will return immediately, and the result will not be available. + * After SQS is removed, the tests should be enhanced to ensure that we're processing our queues correctly. + * @param applicationScope + * @param entityId + * @param version + */ + void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Id entityId, final UUID version ); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/745f3557/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueue.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueue.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueue.java deleted file mode 100644 index fc1bdb7..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueue.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.index; - - -import java.util.List; -import java.util.concurrent.TimeUnit; - -import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; - - -/** - * A temporary interface of our buffer Q to decouple of producer and consumer; - */ -public interface BufferQueue { - - /** - * Offer the indexoperation message. Some queues may support not returning the future until ack or fail. - * Other queues may return the future after ack on the offer. See the implementation documentation for details. - * @param operation - */ - public void offer(final IndexOperationMessage operation); - - - /** - * Perform a take, potentially blocking until up to takesize is available, or timeout has elapsed. - * May return less than the take size, but will never return null - * - * @param takeSize - * @param timeout - * @param timeUnit - * @return A null safe lid - */ - public List<IndexOperationMessage> take(final int takeSize, final long timeout, final TimeUnit timeUnit ); - - - /** - * Ack all messages so they do not appear again. Meant for transactional queues, and may or may not be implemented. - * This will set the future as done in in memory operations - * - * @param messages - */ - public void ack(final List<IndexOperationMessage> messages); - - /** - * Mark these message as failed. Set the exception in the future on local operation - * - * @param messages - */ - public void fail(final List<IndexOperationMessage> messages, final Throwable t); -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/745f3557/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueInMemoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueInMemoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueInMemoryImpl.java deleted file mode 100644 index 0e43da3..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueInMemoryImpl.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.index; - - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.TimeUnit; - -import org.apache.usergrid.persistence.core.future.BetterFuture; -import org.apache.usergrid.persistence.index.IndexFig; -import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; - -import com.google.inject.Inject; -import com.google.inject.Singleton; - - -@Singleton -public class BufferQueueInMemoryImpl implements BufferQueue { - - - private final QueryFig fig; - private final ArrayBlockingQueue<IndexOperationMessage> messages; - - - @Inject - public BufferQueueInMemoryImpl( final QueryFig fig ) { - this.fig = fig; - messages = new ArrayBlockingQueue<>( fig.getIndexQueueSize() ); - } - - - @Override - public void offer( final IndexOperationMessage operation ) { - try { - messages.offer( operation, fig.getQueueOfferTimeout(), TimeUnit.MILLISECONDS ); - } - catch ( InterruptedException e ) { - throw new RuntimeException("Unable to offer message to queue", e); - } - } - - - @Override - public List<IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) { - - final List<IndexOperationMessage> response = new ArrayList<>( takeSize ); - try { - - - messages.drainTo( response, takeSize ); - - //we got something, go process it - if ( response.size() > 0 ) { - return response; - } - - - final IndexOperationMessage polled = messages.poll( timeout, timeUnit ); - - if ( polled != null ) { - response.add( polled ); - - //try to add more - messages.drainTo( response, takeSize - 1 ); - } - } - catch ( InterruptedException e ) { - //swallow - } - - - return response; - } - - - @Override - public void ack( final List<IndexOperationMessage> messages ) { - //if we have a future ack it - for ( final IndexOperationMessage op : messages ) { - op.done(); - } - } - - - @Override - public void fail( final List<IndexOperationMessage> messages, final Throwable t ) { - - - for ( final IndexOperationMessage op : messages ) { - final BetterFuture<IndexOperationMessage> future = op.getFuture(); - - if ( future != null ) { - future.setError( t ); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/745f3557/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueSQSImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueSQSImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueSQSImpl.java deleted file mode 100644 index d955014..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueSQSImpl.java +++ /dev/null @@ -1,306 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.index; - - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.persistence.core.metrics.MetricsFactory; -import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; -import org.apache.usergrid.persistence.map.MapManager; -import org.apache.usergrid.persistence.map.MapManagerFactory; -import org.apache.usergrid.persistence.map.MapScope; -import org.apache.usergrid.persistence.map.impl.MapScopeImpl; -import org.apache.usergrid.persistence.model.entity.SimpleId; -import org.apache.usergrid.persistence.model.util.UUIDGenerator; -import org.apache.usergrid.persistence.queue.QueueManager; -import org.apache.usergrid.persistence.queue.QueueManagerFactory; -import org.apache.usergrid.persistence.queue.QueueMessage; -import org.apache.usergrid.persistence.queue.QueueScope; -import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl; - -import com.codahale.metrics.Meter; -import com.codahale.metrics.Timer; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.smile.SmileFactory; -import com.google.inject.Inject; -import com.google.inject.Singleton; - - -/** - * This is experimental at best. Our SQS size limit is a problem. We shouldn't use this for index operation. Only for - * performing - */ -@Singleton -public class BufferQueueSQSImpl implements BufferQueue { - - private static final Logger logger = LoggerFactory.getLogger( BufferQueueSQSImpl.class ); - - /** Hacky, copied from CPEntityManager b/c we can't access it here */ - public static final UUID MANAGEMENT_APPLICATION_ID = UUID.fromString( "b6768a08-b5d5-11e3-a495-11ddb1de66c8" ); - - - /** - * Set our TTL to 1 month. This is high, but in the event of a bug, we want these entries to get removed - */ - public static final int TTL = 60 * 60 * 24 * 30; - - /** - * The name to put in the map - */ - public static final String MAP_NAME = "esqueuedata"; - - - private static final String QUEUE_NAME = "es_queue"; - - private static SmileFactory SMILE_FACTORY = new SmileFactory(); - - - static { - SMILE_FACTORY.delegateToTextual( true ); - } - - - private final QueueManager queue; - private final MapManager mapManager; - private final QueryFig queryFig; - private final ObjectMapper mapper; - private final Meter readMeter; - private final Timer readTimer; - private final Meter writeMeter; - private final Timer writeTimer; - - - @Inject - public BufferQueueSQSImpl( final QueueManagerFactory queueManagerFactory, final QueryFig queryFig, - final MapManagerFactory mapManagerFactory, final MetricsFactory metricsFactory ) { - final QueueScope queueScope = - new QueueScopeImpl( QUEUE_NAME ); - - this.queue = queueManagerFactory.getQueueManager( queueScope ); - this.queryFig = queryFig; - - final MapScope scope = new MapScopeImpl( new SimpleId( MANAGEMENT_APPLICATION_ID, "application" ), MAP_NAME ); - - this.mapManager = mapManagerFactory.createMapManager( scope ); - - - this.writeTimer = metricsFactory.getTimer( BufferQueueSQSImpl.class, "write.timer" ); - this.writeMeter = metricsFactory.getMeter( BufferQueueSQSImpl.class, "write.meter" ); - - this.readTimer = metricsFactory.getTimer( BufferQueueSQSImpl.class, "read.timer" ); - this.readMeter = metricsFactory.getMeter( BufferQueueSQSImpl.class, "read.meter" ); - - this.mapper = new ObjectMapper( SMILE_FACTORY ); - //pretty print, disabling for speed - // mapper.enable(SerializationFeature.INDENT_OUTPUT); - - } - - - @Override - public void offer( final IndexOperationMessage operation ) { - - //no op - if(operation.isEmpty()){ - operation.getFuture().done(); - return; - } - - final Timer.Context timer = this.writeTimer.time(); - this.writeMeter.mark(); - - final UUID identifier = UUIDGenerator.newTimeUUID(); - - try { - - final String payLoad = toString( operation ); - - //write to cassandra - this.mapManager.putString( identifier.toString(), payLoad, TTL ); - - //signal to SQS - this.queue.sendMessage( identifier ); - operation.done(); - } - catch ( IOException e ) { - throw new RuntimeException( "Unable to queue message", e ); - } - finally { - timer.stop(); - } - } - - - @Override - public List<IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) { - - //SQS doesn't support more than 10 - - final int actualTake = Math.min( 10, takeSize ); - - final Timer.Context timer = this.readTimer.time(); - - try { - - List<QueueMessage> messages = queue - .getMessages( actualTake, queryFig.getIndexQueueTimeout(), ( int ) timeUnit.toMillis( timeout ), - String.class ); - - - - final List<IndexOperationMessage> response = new ArrayList<>( messages.size() ); - - final List<String> mapEntries = new ArrayList<>( messages.size() ); - - - if(messages.size() == 0){ - return response; - } - - //add all our keys for a single round trip - for ( final QueueMessage message : messages ) { - mapEntries.add( message.getBody().toString() ); - } - - //look up the values - final Map<String, String> storedCommands = mapManager.getStrings( mapEntries ); - - - //load them into our response - for ( final QueueMessage message : messages ) { - - final String key = getMessageKey( message ); - - //now see if the key was there - final String payload = storedCommands.get( key ); - - //the entry was not present in cassandra, ignore this message. Failure should eventually kick it to - // a DLQ - - if ( payload == null ) { - continue; - } - - final IndexOperationMessage messageBody; - - try { - messageBody = fromString( payload ); - } - catch ( IOException e ) { - logger.error( "Unable to deserialize message from string. This is a bug", e ); - throw new RuntimeException( "Unable to deserialize message from string. This is a bug", e ); - } - - SqsIndexOperationMessage operation = new SqsIndexOperationMessage( message, messageBody ); - - response.add( operation ); - } - - readMeter.mark( response.size() ); - return response; - } - //stop our timer - finally { - timer.stop(); - } - } - - - @Override - public void ack( final List<IndexOperationMessage> messages ) { - - //nothing to do - if ( messages.size() == 0 ) { - return; - } - - List<QueueMessage> toAck = new ArrayList<>( messages.size() ); - - for ( IndexOperationMessage ioe : messages ) { - - - final SqsIndexOperationMessage sqsIndexOperationMessage = ( SqsIndexOperationMessage ) ioe; - - final String key = getMessageKey( sqsIndexOperationMessage.getMessage() ); - - //remove it from the map - mapManager.delete( key ); - - toAck.add( ( ( SqsIndexOperationMessage ) ioe ).getMessage() ); - } - - queue.commitMessages( toAck ); - } - - - @Override - public void fail( final List<IndexOperationMessage> messages, final Throwable t ) { - //no op, just let it retry after the queue timeout - } - - - /** Read the object from Base64 string. */ - private IndexOperationMessage fromString( String s ) throws IOException { - IndexOperationMessage o = mapper.readValue( s, IndexOperationMessage.class ); - return o; - } - - - /** Write the object to a Base64 string. */ - private String toString( IndexOperationMessage o ) throws IOException { - return mapper.writeValueAsString( o ); - } - - private String getMessageKey(final QueueMessage message){ - return message.getBody().toString(); - } - - /** - * The message that subclasses our IndexOperationMessage. holds a pointer to the original message - */ - public class SqsIndexOperationMessage extends IndexOperationMessage { - - private final QueueMessage message; - - - public SqsIndexOperationMessage( final QueueMessage message, final IndexOperationMessage source ) { - this.message = message; - this.addAllDeIndexRequest( source.getDeIndexRequests() ); - this.addAllIndexRequest( source.getIndexRequests() ); - } - - - /** - * Get the message from our queue - */ - public QueueMessage getMessage() { - return message; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/745f3557/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java new file mode 100644 index 0000000..fdc9c65 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import java.util.UUID; + +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.inject.Inject; +import com.google.inject.Singleton; + +import rx.Observable; +import rx.schedulers.Schedulers; + + +@Singleton +public class InMemoryAsyncIndexService implements AsyncIndexService { + + private final IndexService indexService; + private final EntityCollectionManagerFactory entityCollectionManagerFactory; + + + @Inject + public InMemoryAsyncIndexService( final IndexService indexService, + final EntityCollectionManagerFactory entityCollectionManagerFactory ) {this.indexService = indexService; + + + this.entityCollectionManagerFactory = entityCollectionManagerFactory; + } + + + @Override + public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Id entityId, + final UUID version ) { + + final IndexEntityEvent event = new IndexEntityEvent( applicationScope, entityId, version ); + + //process the entity immediately + //only process the same version, otherwise ignore + + getEntity( applicationScope, entityId).filter( entity-> version.equals(entity.hasVersion() )).doOnNext( entity -> { + indexService.indexEntity( applicationScope, entity ); + } ).subscribeOn( Schedulers.io() ).subscribe(); + + + } + + private Observable<Entity> getEntity( final ApplicationScope applicationScope, final Id entityId){ + + final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope ); + return ecm.load( entityId ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/745f3557/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexEntityEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexEntityEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexEntityEvent.java new file mode 100644 index 0000000..9f81f34 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexEntityEvent.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import java.io.Serializable; +import java.util.UUID; + +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Id; + + +/** + * The immutable serializable event that represents and index operations + */ +public class IndexEntityEvent implements Serializable { + + public ApplicationScope applicationScope; + public Id entityId; + public UUID entityVersion; + + + public IndexEntityEvent( final ApplicationScope applicationScope, final Id entityId, final UUID entityVersion ) { + this.applicationScope = applicationScope; + this.entityId = entityId; + this.entityVersion = entityVersion; + } + + + public ApplicationScope getApplicationScope() { + return applicationScope; + } + + + public void setApplicationScope( final ApplicationScope applicationScope ) { + this.applicationScope = applicationScope; + } + + + public Id getEntityId() { + return entityId; + } + + + public void setEntityId( final Id entityId ) { + this.entityId = entityId; + } + + + public UUID getEntityVersion() { + return entityVersion; + } + + + public void setEntityVersion( final UUID entityVersion ) { + this.entityVersion = entityVersion; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/745f3557/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexQueueService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexQueueService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexQueueService.java deleted file mode 100644 index 8d7f222..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexQueueService.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.index; - - -import java.util.UUID; - -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.model.entity.Entity; -import org.apache.usergrid.persistence.model.entity.Id; - -import rx.Observable; - - -/** - * Low level queue service for indexing entities - */ -public interface IndexQueueService { - - - /** - * Queue an entity to be index asynchronously - * @param applicationScope - * @param entityId - * @param version - */ - void queueEntityIndex( final ApplicationScope applicationScope, final Id entityId, final UUID version ); -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/745f3557/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueueProvider.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueueProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueueProvider.java deleted file mode 100644 index d3920db..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueueProvider.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.index; - - -import org.apache.usergrid.persistence.core.metrics.MetricsFactory; -import org.apache.usergrid.persistence.map.MapManagerFactory; -import org.apache.usergrid.persistence.queue.QueueManagerFactory; - -import com.google.inject.Inject; -import com.google.inject.Provider; -import com.google.inject.Singleton; - - -/** - * A provider to allow users to configure their queue impl via properties - */ -@Singleton -public class QueueProvider implements Provider<BufferQueue> { - - private final QueryFig queryFig; - - private final QueueManagerFactory queueManagerFactory; - private final MapManagerFactory mapManagerFactory; - private final MetricsFactory metricsFactory; - - private BufferQueue bufferQueue; - - - @Inject - public QueueProvider( final QueryFig queryFig, final QueueManagerFactory queueManagerFactory, - final MapManagerFactory mapManagerFactory, final MetricsFactory metricsFactory ) { - this.queryFig = queryFig; - - - this.queueManagerFactory = queueManagerFactory; - this.mapManagerFactory = mapManagerFactory; - this.metricsFactory = metricsFactory; - } - - - @Override - @Singleton - public BufferQueue get() { - if ( bufferQueue == null ) { - bufferQueue = getQueue(); - } - - - return bufferQueue; - } - - - private BufferQueue getQueue() { - final String value = queryFig.getQueueImplementation(); - - final Implementations impl = Implementations.valueOf( value ); - - switch ( impl ) { - case LOCAL: - return new BufferQueueInMemoryImpl( queryFig ); - case SQS: - return new BufferQueueSQSImpl( queueManagerFactory, queryFig, mapManagerFactory, metricsFactory ); - default: - throw new IllegalArgumentException( "Configuration value of " + getErrorValues() + " are allowed" ); - } - } - - - private String getErrorValues() { - String values = ""; - - for ( final Implementations impl : Implementations.values() ) { - values += impl + ", "; - } - - values = values.substring( 0, values.length() - 2 ); - - return values; - } - - - /** - * Different implementations - */ - public static enum Implementations { - LOCAL, - SQS; - - - public String asString() { - return toString(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/745f3557/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java new file mode 100644 index 0000000..cad20bd --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.persistence.model.util.UUIDGenerator; +import org.apache.usergrid.persistence.queue.QueueManager; +import org.apache.usergrid.persistence.queue.QueueManagerFactory; +import org.apache.usergrid.persistence.queue.QueueMessage; +import org.apache.usergrid.persistence.queue.QueueScope; +import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.Timer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.smile.SmileFactory; +import com.google.inject.Inject; +import com.google.inject.Singleton; + + +@Singleton +public class SQSAsyncIndexService implements AsyncIndexService { + + + private static final Logger logger = LoggerFactory.getLogger( SQSAsyncIndexService.class ); + + /** Hacky, copied from CPEntityManager b/c we can't access it here */ + public static final UUID MANAGEMENT_APPLICATION_ID = UUID.fromString( "b6768a08-b5d5-11e3-a495-11ddb1de66c8" ); + + + /** + * Set our TTL to 1 month. This is high, but in the event of a bug, we want these entries to get removed + */ + public static final int TTL = 60 * 60 * 24 * 30; + + /** + * The name to put in the map + */ + public static final String MAP_NAME = "esqueuedata"; + + + private static final String QUEUE_NAME = "es_queue"; + + private static SmileFactory SMILE_FACTORY = new SmileFactory(); + + static { + SMILE_FACTORY.delegateToTextual( true ); + } + + + private final QueueManager queue; + private final QueryFig queryFig; + private final ObjectMapper mapper; + private final Meter readMeter; + private final Timer readTimer; + private final Meter writeMeter; + private final Timer writeTimer; + + + @Inject + public SQSAsyncIndexService( final QueueManagerFactory queueManagerFactory, final QueryFig queryFig, + final MetricsFactory metricsFactory ) { + final QueueScope queueScope = new QueueScopeImpl( QUEUE_NAME ); + + this.queue = queueManagerFactory.getQueueManager( queueScope ); + this.queryFig = queryFig; + + this.writeTimer = metricsFactory.getTimer( SQSAsyncIndexService.class, "write.timer" ); + this.writeMeter = metricsFactory.getMeter( SQSAsyncIndexService.class, "write.meter" ); + + this.readTimer = metricsFactory.getTimer( SQSAsyncIndexService.class, "read.timer" ); + this.readMeter = metricsFactory.getMeter( SQSAsyncIndexService.class, "read.meter" ); + + this.mapper = new ObjectMapper( SMILE_FACTORY ); + //pretty print, disabling for speed + // mapper.enable(SerializationFeature.INDENT_OUTPUT); + + } + + + public void offer( final IndexEntityEvent operation ) { + final Timer.Context timer = this.writeTimer.time(); + this.writeMeter.mark(); + + final UUID identifier = UUIDGenerator.newTimeUUID(); + + try { + + final String payLoad = toString( operation ); + + //signal to SQS + this.queue.sendMessage( identifier ); + } + catch ( IOException e ) { + throw new RuntimeException( "Unable to queue message", e ); + } + finally { + timer.stop(); + } + } + + + public List<IndexEntityEvent> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) { + + //SQS doesn't support more than 10 + + final int actualTake = Math.min( 10, takeSize ); + + final Timer.Context timer = this.readTimer.time(); + + try { + + List<QueueMessage> messages = queue + .getMessages( actualTake, queryFig.getIndexQueueTimeout(), ( int ) timeUnit.toMillis( timeout ), + String.class ); + + + final List<IndexEntityEvent> response = new ArrayList<>( messages.size() ); + + final List<String> mapEntries = new ArrayList<>( messages.size() ); + + + if ( messages.size() == 0 ) { + return Collections.emptyList(); + } + + //add all our keys for a single round trip + for ( final QueueMessage message : messages ) { + mapEntries.add( message.getBody().toString() ); + } + + + //load them into our response + for ( final QueueMessage message : messages ) { + + final String payload = getBody( message ); + + //now see if the key was there + + + final IndexEntityEvent messageBody; + + try { + messageBody = fromString( payload ); + } + catch ( IOException e ) { + logger.error( "Unable to deserialize message from string. This is a bug", e ); + throw new RuntimeException( "Unable to deserialize message from string. This is a bug", e ); + } + + SqsIndexOperationMessage operation = new SqsIndexOperationMessage( message, messageBody ); + + response.add( operation ); + } + + readMeter.mark( response.size() ); + return response; + } + //stop our timer + finally { + timer.stop(); + } + } + + + public void ack( final List<IndexEntityEvent> messages ) { + + //nothing to do + if ( messages.size() == 0 ) { + return; + } + + List<QueueMessage> toAck = new ArrayList<>( messages.size() ); + + for ( IndexEntityEvent ioe : messages ) { + + + final SqsIndexOperationMessage sqsIndexOperationMessage = ( SqsIndexOperationMessage ) ioe; + + toAck.add( ( ( SqsIndexOperationMessage ) ioe ).getMessage() ); + } + + queue.commitMessages( toAck ); + } + + + /** Read the object from Base64 string. */ + private IndexEntityEvent fromString( String s ) throws IOException { + IndexEntityEvent o = mapper.readValue( s, IndexEntityEvent.class ); + return o; + } + + + /** Write the object to a Base64 string. */ + private String toString( IndexEntityEvent o ) throws IOException { + return mapper.writeValueAsString( o ); + } + + + private String getBody( final QueueMessage message ) { + return message.getBody().toString(); + } + + + /** + * The message that subclasses our IndexOperationMessage. holds a pointer to the original message + */ + public class SqsIndexOperationMessage extends IndexEntityEvent { + + private final QueueMessage message; + + + public SqsIndexOperationMessage( final QueueMessage message, final IndexEntityEvent source ) { + super( source.getApplicationScope(), source.getEntityId(), source.getEntityVersion() ); + this.message = message; + } + + + /** + * Get the message from our queue + */ + public QueueMessage getMessage() { + return message; + } + } + + + @Override + public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Id entityId, + final UUID version ) { + + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/745f3557/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSIndexQueueServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSIndexQueueServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSIndexQueueServiceImpl.java deleted file mode 100644 index 42f36b1..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSIndexQueueServiceImpl.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.index; - - -import java.util.UUID; - -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.model.entity.Id; - - -public class SQSIndexQueueServiceImpl implements IndexQueueService { - - @Override - public void queueEntityIndex( final ApplicationScope applicationScope, final Id entityId, final UUID version ) { - - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/745f3557/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/BufferQueueSQSImplTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/BufferQueueSQSImplTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/BufferQueueSQSImplTest.java deleted file mode 100644 index a76a589..0000000 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/BufferQueueSQSImplTest.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.index; - - -import java.util.*; -import java.util.concurrent.TimeUnit; - -import org.apache.usergrid.corepersistence.TestIndexModule; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; -import org.apache.usergrid.persistence.index.SearchEdge; -import org.apache.usergrid.persistence.index.impl.DeIndexRequest; -import org.apache.usergrid.persistence.index.impl.EsRunner; -import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; -import org.apache.usergrid.persistence.index.impl.IndexRequest; -import org.apache.usergrid.persistence.index.impl.SearchEdgeImpl; -import org.apache.usergrid.persistence.model.entity.SimpleId; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; - -import org.apache.usergrid.persistence.core.guice.MigrationManagerRule; -import org.apache.usergrid.persistence.core.metrics.MetricsFactory; -import org.apache.usergrid.persistence.core.test.UseModules; -import org.apache.usergrid.persistence.map.MapManagerFactory; -import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule; -import org.apache.usergrid.persistence.queue.QueueManagerFactory; -import org.apache.usergrid.persistence.queue.impl.UsergridAwsCredentialsProvider; - -import com.google.inject.Inject; - -import net.jcip.annotations.NotThreadSafe; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assume.assumeTrue; - - -@RunWith(EsRunner.class) -@UseModules({ TestIndexModule.class }) -@NotThreadSafe -public class BufferQueueSQSImplTest { - - - @Inject - @Rule - public MigrationManagerRule migrationManagerRule; - - - @Rule - public NoAWSCredsRule noAwsCredsRule = new NoAWSCredsRule(); - - @Inject - public QueueManagerFactory queueManagerFactory; - - @Inject - public QueryFig queryFig; - - @Inject - public MapManagerFactory mapManagerFactory; - - @Inject - public MetricsFactory metricsFactory; - - - private BufferQueueSQSImpl bufferQueueSQS; - - @Before - public void setup(){ - bufferQueueSQS = new BufferQueueSQSImpl( queueManagerFactory, queryFig, mapManagerFactory, metricsFactory ); - } - - - - - @Test - public void testMessageIndexing(){ - - ApplicationScope applicationScope = new ApplicationScopeImpl(new SimpleId(UUID.randomUUID(),"application")); - final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider(); - assumeTrue( ugProvider.getCredentials().getAWSAccessKeyId() != null ); - assumeTrue( ugProvider.getCredentials().getAWSSecretKey() != null ); - - final Map<String, Object> request1Data = new HashMap<String, Object>() {{put("test", "testval1");}}; - final IndexRequest indexRequest1 = new IndexRequest( "testAlias1", "testDoc1",request1Data ); - - - final Map<String, Object> request2Data = new HashMap<String, Object>() {{put("test", "testval2");}}; - final IndexRequest indexRequest2 = new IndexRequest( "testAlias2", "testDoc2",request2Data ); - - - //de-index request - final DeIndexRequest - deIndexRequest1 = new DeIndexRequest( new String[]{"index1.1, index1.2"}, applicationScope, new SearchEdgeImpl(new SimpleId("testId3"),"name3", - - - SearchEdge.NodeType.SOURCE ), new SimpleId("id3"), UUID.randomUUID() ); - - final DeIndexRequest deIndexRequest2 = new DeIndexRequest( new String[]{"index2.1", "index2.1"}, applicationScope, new SearchEdgeImpl(new SimpleId("testId4"),"name4", - SearchEdge.NodeType.SOURCE ), new SimpleId("id4"), UUID.randomUUID() ); - - - - - IndexOperationMessage indexOperationMessage = new IndexOperationMessage(); - indexOperationMessage.addIndexRequest( indexRequest1); - indexOperationMessage.addIndexRequest( indexRequest2); - - indexOperationMessage.addDeIndexRequest( deIndexRequest1 ); - indexOperationMessage.addDeIndexRequest( deIndexRequest2 ); - - bufferQueueSQS.offer( indexOperationMessage ); - - //wait for it to send to SQS - indexOperationMessage.getFuture().get(); - - //now get it back - - final List<IndexOperationMessage> ops = getResults( 20, TimeUnit.SECONDS ); - - assertTrue(ops.size() > 0); - - final IndexOperationMessage returnedOperation = ops.get( 0 ); - - //get the operations out - - final Set<IndexRequest> indexRequestSet = returnedOperation.getIndexRequests(); - - assertTrue(indexRequestSet.contains(indexRequest1)); - assertTrue(indexRequestSet.contains(indexRequest2)); - - - final Set<DeIndexRequest> deIndexRequests = returnedOperation.getDeIndexRequests(); - - assertTrue( deIndexRequests.contains( deIndexRequest1 ) ); - assertTrue( deIndexRequests.contains( deIndexRequest2 ) ); - - - - //now ack the message - - bufferQueueSQS.ack( ops ); - - } - - private List<IndexOperationMessage> getResults(final long timeout, final TimeUnit timeUnit){ - final long endTime = System.currentTimeMillis() + timeUnit.toMillis( timeout ); - - List<IndexOperationMessage> ops; - - do{ - ops = bufferQueueSQS.take( 10, 20, TimeUnit.SECONDS ); - }while((ops == null || ops.size() == 0 ) && System.currentTimeMillis() < endTime); - - return ops; - } - - - - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/745f3557/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexServiceTest.java new file mode 100644 index 0000000..0977caa --- /dev/null +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexServiceTest.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import java.util.*; +import java.util.concurrent.TimeUnit; + +import org.apache.usergrid.corepersistence.TestIndexModule; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; +import org.apache.usergrid.persistence.index.SearchEdge; +import org.apache.usergrid.persistence.index.impl.DeIndexRequest; +import org.apache.usergrid.persistence.index.impl.EsRunner; +import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; +import org.apache.usergrid.persistence.index.impl.IndexRequest; +import org.apache.usergrid.persistence.index.impl.SearchEdgeImpl; +import org.apache.usergrid.persistence.model.entity.SimpleId; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.apache.usergrid.persistence.core.guice.MigrationManagerRule; +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.core.test.UseModules; +import org.apache.usergrid.persistence.map.MapManagerFactory; +import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule; +import org.apache.usergrid.persistence.queue.QueueManagerFactory; +import org.apache.usergrid.persistence.queue.impl.UsergridAwsCredentialsProvider; + +import com.google.inject.Inject; + +import net.jcip.annotations.NotThreadSafe; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.junit.Assume.assumeTrue; + + +@RunWith(EsRunner.class) +@UseModules({ TestIndexModule.class }) +@NotThreadSafe +public class SQSAsyncIndexServiceTest { + + + @Inject + @Rule + public MigrationManagerRule migrationManagerRule; + + + @Rule + public NoAWSCredsRule noAwsCredsRule = new NoAWSCredsRule(); + + @Inject + public QueueManagerFactory queueManagerFactory; + + @Inject + public QueryFig queryFig; + + + @Inject + public MetricsFactory metricsFactory; + + + private SQSAsyncIndexService bufferQueueSQS; + + @Before + public void setup(){ + bufferQueueSQS = new SQSAsyncIndexService( queueManagerFactory, queryFig, metricsFactory ); + } + + + + + @Test + public void testMessageIndexing(){ + + fail("fix me"); +// ApplicationScope applicationScope = new ApplicationScopeImpl(new SimpleId(UUID.randomUUID(),"application")); +// final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider(); +// assumeTrue( ugProvider.getCredentials().getAWSAccessKeyId() != null ); +// assumeTrue( ugProvider.getCredentials().getAWSSecretKey() != null ); +// +// final Map<String, Object> request1Data = new HashMap<String, Object>() {{put("test", "testval1");}}; +// final IndexRequest indexRequest1 = new IndexRequest( "testAlias1", "testDoc1",request1Data ); +// +// +// final Map<String, Object> request2Data = new HashMap<String, Object>() {{put("test", "testval2");}}; +// final IndexRequest indexRequest2 = new IndexRequest( "testAlias2", "testDoc2",request2Data ); +// +// +// //de-index request +// final DeIndexRequest +// deIndexRequest1 = new DeIndexRequest( new String[]{"index1.1, index1.2"}, applicationScope, new SearchEdgeImpl(new SimpleId("testId3"),"name3", +// +// +// SearchEdge.NodeType.SOURCE ), new SimpleId("id3"), UUID.randomUUID() ); +// +// final DeIndexRequest deIndexRequest2 = new DeIndexRequest( new String[]{"index2.1", "index2.1"}, applicationScope, new SearchEdgeImpl(new SimpleId("testId4"),"name4", +// SearchEdge.NodeType.SOURCE ), new SimpleId("id4"), UUID.randomUUID() ); +// +// +// +// +// IndexOperationMessage indexOperationMessage = new IndexOperationMessage(); +// indexOperationMessage.addIndexRequest( indexRequest1); +// indexOperationMessage.addIndexRequest( indexRequest2); +// +// indexOperationMessage.addDeIndexRequest( deIndexRequest1 ); +// indexOperationMessage.addDeIndexRequest( deIndexRequest2 ); +// +// bufferQueueSQS.offer( indexOperationMessage ); +// +// //wait for it to send to SQS +// indexOperationMessage.getFuture().get(); +// +// //now get it back +// +// final List<IndexOperationMessage> ops = getResults( 20, TimeUnit.SECONDS ); +// +// assertTrue(ops.size() > 0); +// +// final IndexOperationMessage returnedOperation = ops.get( 0 ); +// +// //get the operations out +// +// final Set<IndexRequest> indexRequestSet = returnedOperation.getIndexRequests(); +// +// assertTrue(indexRequestSet.contains(indexRequest1)); +// assertTrue(indexRequestSet.contains(indexRequest2)); +// +// +// final Set<DeIndexRequest> deIndexRequests = returnedOperation.getDeIndexRequests(); +// +// assertTrue( deIndexRequests.contains( deIndexRequest1 ) ); +// assertTrue( deIndexRequests.contains( deIndexRequest2 ) ); +// +// +// +// //now ack the message +// +// bufferQueueSQS.ack( ops ); + + } + +// private List<IndexOperationMessage> getResults(final long timeout, final TimeUnit timeUnit){ +// final long endTime = System.currentTimeMillis() + timeUnit.toMillis( timeout ); +// +// List<IndexOperationMessage> ops; +// +// do{ +// ops = bufferQueueSQS.take( 10, 20, TimeUnit.SECONDS ); +// }while((ops == null || ops.size() == 0 ) && System.currentTimeMillis() < endTime); +// +// return ops; +// } + + + + +}
