Updated tests and providers for testing all async index services and index service itself
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/f4d0d1a9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/f4d0d1a9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/f4d0d1a9 Branch: refs/heads/two-dot-o-dev Commit: f4d0d1a95d8a0f5fc27eae732e61cab3d4940783 Parents: 2a58270 Author: Todd Nine <tn...@apigee.com> Authored: Wed Apr 22 14:23:35 2015 -0600 Committer: Todd Nine <tn...@apigee.com> Committed: Wed Apr 22 14:23:35 2015 -0600 ---------------------------------------------------------------------- .../usergrid/corepersistence/CoreModule.java | 8 +- .../corepersistence/CpEntityManager.java | 6 +- .../corepersistence/CpEntityManagerFactory.java | 6 +- .../corepersistence/CpRelationManager.java | 6 +- .../index/AsyncIndexProvider.java | 24 +- .../index/AsyncIndexService.java | 42 +++ .../index/AsyncReIndexService.java | 42 --- .../index/InMemoryAsyncIndexService.java | 88 +++++ .../index/InMemoryAsyncReIndexService.java | 108 ------ .../index/IndexProcessorFig.java | 86 +++++ .../corepersistence/index/IndexService.java | 6 +- .../corepersistence/index/IndexServiceImpl.java | 9 +- .../corepersistence/index/QueryFig.java | 103 ------ .../index/ReIndexServiceImpl.java | 12 +- .../index/SQSAsyncIndexService.java | 325 +++++++++++++++++++ .../index/SQSAsyncReIndexService.java | 269 --------------- .../corepersistence/util/CpNamingUtils.java | 8 +- .../util/SerializableMapper.java | 4 +- .../index/AsyncIndexServiceTest.java | 203 ++++++++++++ .../index/InMemoryAsycIndexServiceTest.java | 64 ++++ .../corepersistence/index/IndexServiceTest.java | 233 ++++++++++++- .../index/SQSAsyncIndexServiceTest.java | 143 +++----- .../impl/migration/EntityIdScope.java | 40 ++- .../migration/data/MigrationRelationship.java | 29 +- .../migration/data/VersionedMigrationSet.java | 31 +- .../persistence/core/rx/RxSchedulerFig.java | 4 +- .../core/scope/ApplicationScope.java | 4 + .../core/scope/ApplicationScopeImpl.java | 21 +- .../data/VersionedMigrationSetTest.java | 8 +- .../usergrid/persistence/model/entity/Id.java | 3 + .../persistence/model/entity/SimpleId.java | 37 ++- .../index/impl/IndexOperationMessage.java | 8 - 32 files changed, 1259 insertions(+), 721 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java index 51972d8..06fe058 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java @@ -22,10 +22,10 @@ import org.apache.usergrid.corepersistence.events.EntityDeletedHandler; import org.apache.usergrid.corepersistence.events.EntityVersionCreatedHandler; import org.apache.usergrid.corepersistence.events.EntityVersionDeletedHandler; import org.apache.usergrid.corepersistence.index.AsyncIndexProvider; -import org.apache.usergrid.corepersistence.index.AsyncReIndexService; +import org.apache.usergrid.corepersistence.index.AsyncIndexService; import org.apache.usergrid.corepersistence.index.IndexService; import org.apache.usergrid.corepersistence.index.IndexServiceImpl; -import org.apache.usergrid.corepersistence.index.QueryFig; +import org.apache.usergrid.corepersistence.index.IndexProcessorFig; import org.apache.usergrid.corepersistence.migration.AppInfoMigrationPlugin; import org.apache.usergrid.corepersistence.migration.CoreMigration; import org.apache.usergrid.corepersistence.migration.CoreMigrationPlugin; @@ -151,9 +151,9 @@ public class CoreModule extends AbstractModule { bind( IndexService.class ).to( IndexServiceImpl.class ); //bind the queue provider - bind( AsyncReIndexService.class).toProvider( AsyncIndexProvider.class ); + bind( AsyncIndexService.class).toProvider( AsyncIndexProvider.class ); - install( new GuicyFigModule( QueryFig.class ) ); + install( new GuicyFigModule( IndexProcessorFig.class ) ); install( new GuicyFigModule( ApplicationIdCacheFig.class ) ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 72ca955..23514c8 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -37,7 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; -import org.apache.usergrid.corepersistence.index.AsyncReIndexService; +import org.apache.usergrid.corepersistence.index.AsyncIndexService; import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.persistence.AggregateCounter; @@ -175,7 +175,7 @@ public class CpEntityManager implements EntityManager { private final CounterUtils counterUtils; - private final AsyncReIndexService indexService; + private final AsyncIndexService indexService; private boolean skipAggregateCounters; private MetricsFactory metricsFactory; @@ -215,7 +215,7 @@ public class CpEntityManager implements EntityManager { * @param metricsFactory * @param applicationId */ - public CpEntityManager(final CassandraService cass, final CounterUtils counterUtils, final AsyncReIndexService indexService, final ManagerCache managerCache, final MetricsFactory metricsFactory, final UUID applicationId ) { + public CpEntityManager(final CassandraService cass, final CounterUtils counterUtils, final AsyncIndexService indexService, final ManagerCache managerCache, final MetricsFactory metricsFactory, final UUID applicationId ) { Preconditions.checkNotNull( cass, "cass must not be null" ); Preconditions.checkNotNull( counterUtils, "counterUtils must not be null" ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index 46db3f8..f08bce4 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -33,7 +33,7 @@ import org.springframework.context.ApplicationContextAware; import org.apache.commons.lang.StringUtils; -import org.apache.usergrid.corepersistence.index.AsyncReIndexService; +import org.apache.usergrid.corepersistence.index.AsyncIndexService; import org.apache.usergrid.corepersistence.index.ReIndexService; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.exception.ConflictException; @@ -122,7 +122,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application private Injector injector; private final EntityIndex entityIndex; private final MetricsFactory metricsFactory; - private final AsyncReIndexService indexService; + private final AsyncIndexService indexService; public CpEntityManagerFactory( final CassandraService cassandraService, final CounterUtils counterUtils, final Injector injector) { @@ -134,7 +134,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application this.entityIndexFactory = injector.getInstance(EntityIndexFactory.class); this.managerCache = injector.getInstance( ManagerCache.class ); this.metricsFactory = injector.getInstance( MetricsFactory.class ); - this.indexService = injector.getInstance( AsyncReIndexService.class ); + this.indexService = injector.getInstance( AsyncIndexService.class ); this.applicationIdCache = injector.getInstance(ApplicationIdCacheFactory.class).getInstance( getManagementEntityManager() ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index 8f125ad..ea554a3 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -30,7 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; -import org.apache.usergrid.corepersistence.index.AsyncReIndexService; +import org.apache.usergrid.corepersistence.index.AsyncIndexService; import org.apache.usergrid.corepersistence.results.CollectionResultsLoaderFactoryImpl; import org.apache.usergrid.corepersistence.results.ConnectionResultsLoaderFactoryImpl; import org.apache.usergrid.corepersistence.results.ElasticSearchQueryExecutor; @@ -118,13 +118,13 @@ public class CpRelationManager implements RelationManager { private final ApplicationScope applicationScope; - private final AsyncReIndexService indexService; + private final AsyncIndexService indexService; private MetricsFactory metricsFactory; private Timer updateCollectionTimer; - public CpRelationManager( final MetricsFactory metricsFactory, final ManagerCache managerCache, final AsyncReIndexService indexService, final EntityManager em, final UUID applicationId, final EntityRef headEntity) { + public CpRelationManager( final MetricsFactory metricsFactory, final ManagerCache managerCache, final AsyncIndexService indexService, final EntityManager em, final UUID applicationId, final EntityRef headEntity) { Assert.notNull( em, "Entity manager cannot be null" ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java index 18df824..0043166 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java @@ -20,7 +20,6 @@ package org.apache.usergrid.corepersistence.index; -import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; @@ -35,9 +34,9 @@ import com.google.inject.Singleton; * A provider to allow users to configure their queue impl via properties */ @Singleton -public class AsyncIndexProvider implements Provider<AsyncReIndexService> { +public class AsyncIndexProvider implements Provider<AsyncIndexService> { - private final QueryFig queryFig; + private final IndexProcessorFig indexProcessorFig; private final QueueManagerFactory queueManagerFactory; private final MetricsFactory metricsFactory; @@ -45,15 +44,15 @@ public class AsyncIndexProvider implements Provider<AsyncReIndexService> { private final RxTaskScheduler rxTaskScheduler; private final EntityCollectionManagerFactory entityCollectionManagerFactory; - private AsyncReIndexService asyncIndexService; + private AsyncIndexService asyncIndexService; @Inject - public AsyncIndexProvider( final QueryFig queryFig, final QueueManagerFactory queueManagerFactory, + public AsyncIndexProvider( final IndexProcessorFig indexProcessorFig, final QueueManagerFactory queueManagerFactory, final MetricsFactory metricsFactory, final IndexService indexService, final RxTaskScheduler rxTaskScheduler, final EntityCollectionManagerFactory entityCollectionManagerFactory ) { - this.queryFig = queryFig; + this.indexProcessorFig = indexProcessorFig; this.queueManagerFactory = queueManagerFactory; this.metricsFactory = metricsFactory; this.indexService = indexService; @@ -64,7 +63,7 @@ public class AsyncIndexProvider implements Provider<AsyncReIndexService> { @Override @Singleton - public AsyncReIndexService get() { + public AsyncIndexService get() { if ( asyncIndexService == null ) { asyncIndexService = getIndexService(); } @@ -74,17 +73,18 @@ public class AsyncIndexProvider implements Provider<AsyncReIndexService> { } - private AsyncReIndexService getIndexService() { - final String value = queryFig.getQueueImplementation(); + private AsyncIndexService getIndexService() { + final String value = indexProcessorFig.getQueueImplementation(); final Implementations impl = Implementations.valueOf( value ); switch ( impl ) { case LOCAL: - return new InMemoryAsyncReIndexService( indexService, rxTaskScheduler, - entityCollectionManagerFactory, metricsFactory ); + return new InMemoryAsyncIndexService( indexService, rxTaskScheduler, + entityCollectionManagerFactory ); case SQS: - return new SQSAsyncReIndexService( queueManagerFactory, queryFig, metricsFactory ); + return new SQSAsyncIndexService( queueManagerFactory, indexProcessorFig, metricsFactory, indexService, + entityCollectionManagerFactory, rxTaskScheduler ); default: throw new IllegalArgumentException( "Configuration value of " + getErrorValues() + " are allowed" ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java new file mode 100644 index 0000000..dac92c8 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Entity; + + +/** + * Low level queue service for indexing entities + */ +public interface AsyncIndexService extends ReIndexAction { + + + /** + * Queue an entity to be indexed. This will start processing immediately. For implementations that are realtime (akka, in memory) + * We will return a distributed future. For SQS impls, this will return immediately, and the result will not be available. + * After SQS is removed, the tests should be enhanced to ensure that we're processing our queues correctly. + * @param applicationScope + * @param entity The entity to index + */ + void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity); + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncReIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncReIndexService.java deleted file mode 100644 index c6eedd7..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncReIndexService.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.index; - - -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.model.entity.Entity; - - -/** - * Low level queue service for indexing entities - */ -public interface AsyncReIndexService extends ReIndexAction { - - - /** - * Queue an entity to be indexed. This will start processing immediately. For implementations that are realtime (akka, in memory) - * We will return a distributed future. For SQS impls, this will return immediately, and the result will not be available. - * After SQS is removed, the tests should be enhanced to ensure that we're processing our queues correctly. - * @param applicationScope - * @param entity The entity to index - */ - void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity); - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java new file mode 100644 index 0000000..e8d178c --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; +import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.inject.Inject; +import com.google.inject.Singleton; + +import rx.Observable; + + +@Singleton +public class InMemoryAsyncIndexService implements AsyncIndexService { + + private static final Logger log = LoggerFactory.getLogger( InMemoryAsyncIndexService.class ); + + private final IndexService indexService; + private final RxTaskScheduler rxTaskScheduler; + private final EntityCollectionManagerFactory entityCollectionManagerFactory; + + + @Inject + public InMemoryAsyncIndexService( final IndexService indexService, final RxTaskScheduler rxTaskScheduler, + final EntityCollectionManagerFactory entityCollectionManagerFactory ) { + this.indexService = indexService; + this.rxTaskScheduler = rxTaskScheduler; + this.entityCollectionManagerFactory = entityCollectionManagerFactory; + } + + + @Override + public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity ) { + + //process the entity immediately + //only process the same version, otherwise ignore + + + log.debug( "Indexing entity {} in app scope {} ", entity, applicationScope ); + + final Observable<IndexOperationMessage> edgeObservable = indexService.indexEntity( applicationScope, entity ); + + //start it in the background on an i/o thread + edgeObservable.subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe(); + } + + + @Override + public void index( final EntityIdScope entityIdScope ) { + + final ApplicationScope applicationScope = entityIdScope.getApplicationScope(); + + final Id entityId = entityIdScope.getId(); + + //load the entity + entityCollectionManagerFactory.createCollectionManager( applicationScope ).load( entityId ) + //perform indexing on the task scheduler and start it + .flatMap( entity -> indexService.indexEntity( applicationScope, entity ) ) + .subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncReIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncReIndexService.java deleted file mode 100644 index 4908945..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncReIndexService.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.index; - - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; -import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; -import org.apache.usergrid.persistence.core.metrics.MetricsFactory; -import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; -import org.apache.usergrid.persistence.model.entity.Entity; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.codahale.metrics.Timer; -import com.google.inject.Inject; -import com.google.inject.Singleton; - -import rx.Observable; - - -@Singleton -public class InMemoryAsyncReIndexService implements AsyncReIndexService { - - private static final Logger log = LoggerFactory.getLogger( InMemoryAsyncReIndexService.class ); - private final IndexService indexService; - private final RxTaskScheduler rxTaskScheduler; - private final EntityCollectionManagerFactory entityCollectionManagerFactory; - private final Timer timer; - - - @Inject - public InMemoryAsyncReIndexService( final IndexService indexService, final RxTaskScheduler rxTaskScheduler, - final EntityCollectionManagerFactory entityCollectionManagerFactory, final - MetricsFactory metricsFactory ) { - this.indexService = indexService; - this.rxTaskScheduler = rxTaskScheduler; - this.entityCollectionManagerFactory = entityCollectionManagerFactory; - - timer = metricsFactory.getTimer( InMemoryAsyncReIndexService.class, "IndexTimer" ); - } - - - @Override - public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity ) { - - //process the entity immediately - //only process the same version, otherwise ignore - - - log.debug( "Indexing entity {} in app scope {} ", entity, applicationScope ); - - final Observable<IndexOperationMessage> edgeObservable = indexService.indexEntity( applicationScope, entity ); - - - - edgeObservable.subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe(); - - //now start it -// final Timer.Context time = timer.time(); -// -// edgeObservable.connect(); -// -// time.stop(); - - - } - - - @Override - public void index( final EntityIdScope entityIdScope ) { - - final ApplicationScope applicationScope = entityIdScope.getApplicationScope(); - - final Id entityId = entityIdScope.getId(); - - final Entity entity = - entityCollectionManagerFactory.createCollectionManager( applicationScope ).load( entityId ).toBlocking() - .lastOrDefault( null ); - - - if ( entity == null ) { - log.warn( "Could not find entity with id {} in app scope {} ", entityId, applicationScope ); - } - - indexService.indexEntity( applicationScope, entity ); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java new file mode 100644 index 0000000..1e8abff --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.corepersistence.index; + + +import org.safehaus.guicyfig.Default; +import org.safehaus.guicyfig.GuicyFig; +import org.safehaus.guicyfig.Key; + + +/** + * Application id cache fig + */ +public interface IndexProcessorFig extends GuicyFig { + + + /** + * Amount of time in milliseconds to wait when ES rejects our request before retrying. Provides simple + * backpressure + */ + String FAILURE_REJECTED_RETRY_WAIT_TIME = "elasticsearch.rejected_retry_wait"; + + /** + * The number of worker threads to consume from the queue + */ + String ELASTICSEARCH_WORKER_COUNT = "elasticsearch.worker_count"; + + + /** + * The queue implementation to use. Values come from <class>QueueProvider.Implementations</class> + */ + String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl"; + + + /** + * The queue implementation to use. Values come from <class>QueueProvider.Implementations</class> + */ + String ELASTICSEARCH_QUEUE_OFFER_TIMEOUT = "elasticsearch.queue.offer_timeout"; + + /** + * Amount of time to wait when reading from the queue + */ + String INDEX_QUEUE_READ_TIMEOUT = "elasticsearch.queue_read_timeout"; + + + + @Default( "1000" ) + @Key( FAILURE_REJECTED_RETRY_WAIT_TIME ) + long getFailureRetryTime(); + + //give us 60 seconds to process the message + @Default( "60" ) + @Key( INDEX_QUEUE_READ_TIMEOUT ) + int getIndexQueueTimeout(); + + @Default( "1" ) + @Key( ELASTICSEARCH_WORKER_COUNT ) + int getWorkerCount(); + + @Default( "LOCAL" ) + @Key( ELASTICSEARCH_QUEUE_IMPL ) + String getQueueImplementation(); + + + @Default("30000") + @Key("elasticsearch.reindex.sample.interval") + long getReIndexSampleInterval(); + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java index 5e9392b..18ab2b7 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java @@ -41,9 +41,9 @@ public interface IndexService { * @param applicationScope The scope of the entity * @param entity The entity * - * @return A ConnectableObservable with every edge in the batch to index the entity. Note that this a cold observable - * and must be subscribed to, then "connect" in order to perform the operation. This also makes no assumptions on scheduling. It is up to the caller - * to assign the correct scheduler to the observable + * @return An Observable with executed batch future as an observable. Note that this a cold observable + * and must be subscribed to in order to perform the index operations. This also makes no assumptions on scheduling. It is up to the caller + * to assign the correct scheduler to the observable based on their threading needs */ Observable<IndexOperationMessage> indexEntity( final ApplicationScope applicationScope, final Entity entity ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java index 87e22f4..81bf6cb 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java @@ -99,7 +99,7 @@ public class IndexServiceImpl implements IndexService { //we might or might not need to index from target-> source - final Observable<IndexEdge> targetSizes = getIndexEdgesToTarget( gm, entityId ); + final Observable<IndexEdge> targetSizes = getIndexEdgesAsTarget( gm, entityId ); //merge the edges together @@ -129,12 +129,13 @@ public class IndexServiceImpl implements IndexService { /** - * Get index edges to the target + * Get index edges to the target. Used in only certain entity types, such as roles, users, groups etc + * where we doubly index on both directions of the edge * * @param graphManager The graph manager - * @param entityId The entitie's id + * @param entityId The entity's id */ - private Observable<IndexEdge> getIndexEdgesToTarget( final GraphManager graphManager, final Id entityId ) { + private Observable<IndexEdge> getIndexEdgesAsTarget( final GraphManager graphManager, final Id entityId ) { final String collectionName = InflectionUtils.pluralize( entityId.getType() ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueryFig.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueryFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueryFig.java deleted file mode 100644 index 82ed496..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueryFig.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.usergrid.corepersistence.index; - - -import org.safehaus.guicyfig.Default; -import org.safehaus.guicyfig.GuicyFig; -import org.safehaus.guicyfig.Key; - - -/** - * Application id cache fig - */ -public interface QueryFig extends GuicyFig { - - - /** - * Amount of time in milliseconds to wait when ES rejects our request before retrying. Provides simple - * backpressure - */ - public static final String FAILURE_REJECTED_RETRY_WAIT_TIME = "elasticsearch.rejected_retry_wait"; - - /** - * The number of worker threads to consume from the queue - */ - public static final String ELASTICSEARCH_WORKER_COUNT = "elasticsearch.worker_count"; - - /** - * The queue implementation to use. Values come from <class>QueueProvider.Implementations</class> - */ - public static final String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl"; - - - /** - * The queue implementation to use. Values come from <class>QueueProvider.Implementations</class> - */ - public static final String ELASTICSEARCH_QUEUE_OFFER_TIMEOUT = "elasticsearch.queue.offer_timeout"; - - /** - * Amount of time to wait when reading from the queue - */ - public static final String INDEX_QUEUE_READ_TIMEOUT = "elasticsearch.queue_read_timeout"; - - /** - * Amount of time to wait when reading from the queue in milliseconds - */ - public static final String INDEX_QUEUE_TRANSACTION_TIMEOUT = "elasticsearch.queue_transaction_timeout"; - - - String INDEX_QUEUE_SIZE = "elasticsearch.queue_size"; - - - @Default( "1000" ) - @Key( FAILURE_REJECTED_RETRY_WAIT_TIME ) - long getFailureRetryTime(); - - //give us 60 seconds to process the message - @Default( "60" ) - @Key( INDEX_QUEUE_READ_TIMEOUT ) - int getIndexQueueTimeout(); - - @Default( "2" ) - @Key( ELASTICSEARCH_WORKER_COUNT ) - int getWorkerCount(); - - @Default( "LOCAL" ) - @Key( ELASTICSEARCH_QUEUE_IMPL ) - String getQueueImplementation(); - - @Default( "1000" ) - @Key( ELASTICSEARCH_QUEUE_OFFER_TIMEOUT ) - long getQueueOfferTimeout(); - - /** - * size of the buffer to build up before you send results - */ - @Default( "1000" ) - @Key( INDEX_QUEUE_SIZE ) - int getIndexQueueSize(); - - - @Default("30000") - @Key("elasticsearch.reindex.sample.interval") - long getReIndexSampleInterval(); - - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java index 3553c87..5bf7957 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java @@ -60,20 +60,20 @@ public class ReIndexServiceImpl implements ReIndexService { private final AllApplicationsObservable allApplicationsObservable; private final AllEntityIdsObservable allEntityIdsObservable; - private final QueryFig queryFig; + private final IndexProcessorFig indexProcessorFig; private final RxTaskScheduler rxTaskScheduler; private final MapManager mapManager; - private final AsyncReIndexService indexService; + private final AsyncIndexService indexService; @Inject public ReIndexServiceImpl( final AllEntityIdsObservable allEntityIdsObservable, final MapManagerFactory mapManagerFactory, - final AllApplicationsObservable allApplicationsObservable, final QueryFig queryFig, - final RxTaskScheduler rxTaskScheduler, final AsyncReIndexService indexService ) { + final AllApplicationsObservable allApplicationsObservable, final IndexProcessorFig indexProcessorFig, + final RxTaskScheduler rxTaskScheduler, final AsyncIndexService indexService ) { this.allEntityIdsObservable = allEntityIdsObservable; this.allApplicationsObservable = allApplicationsObservable; - this.queryFig = queryFig; + this.indexProcessorFig = indexProcessorFig; this.rxTaskScheduler = rxTaskScheduler; this.indexService = indexService; @@ -108,7 +108,7 @@ public class ReIndexServiceImpl implements ReIndexService { //start our sampler and state persistence //take a sample every sample interval to allow us to resume state with minimal loss - runningReIndex.sample( queryFig.getReIndexSampleInterval(), TimeUnit.MILLISECONDS, + runningReIndex.sample( indexProcessorFig.getReIndexSampleInterval(), TimeUnit.MILLISECONDS, rxTaskScheduler.getAsyncIOScheduler() ) .doOnNext( edge -> { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java new file mode 100644 index 0000000..d060123 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.core.metrics.ObservableTimer; +import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.queue.QueueManager; +import org.apache.usergrid.persistence.queue.QueueManagerFactory; +import org.apache.usergrid.persistence.queue.QueueMessage; +import org.apache.usergrid.persistence.queue.QueueScope; +import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Timer; +import com.google.inject.Inject; +import com.google.inject.Singleton; + +import rx.Observable; +import rx.Subscriber; +import rx.Subscription; +import rx.schedulers.Schedulers; + + +@Singleton +public class SQSAsyncIndexService implements AsyncIndexService { + + + private static final Logger log = LoggerFactory.getLogger( SQSAsyncIndexService.class ); + + /** + * Set our TTL to 1 month. This is high, but in the event of a bug, we want these entries to get removed + */ + public static final int TTL = 60 * 60 * 24 * 30; + + + private static final int MAX_TAKE = 10; + + private static final String QUEUE_NAME = "es_queue"; + + private final QueueManager queue; + private final IndexProcessorFig indexProcessorFig; + private final IndexService indexService; + private final EntityCollectionManagerFactory entityCollectionManagerFactory; + private final RxTaskScheduler rxTaskScheduler; + + private final Timer readTimer; + private final Timer writeTimer; + private final Timer messageProcessingTimer; + + private final Object mutex = new Object(); + + + private final Counter indexErrorCounter; + private final AtomicLong counter = new AtomicLong(); + private final AtomicLong inFlight = new AtomicLong(); + + //the actively running subscription + private List<Subscription> subscriptions = new ArrayList<>(); + + + @Inject + public SQSAsyncIndexService( final QueueManagerFactory queueManagerFactory, final IndexProcessorFig indexProcessorFig, + final MetricsFactory metricsFactory, final IndexService indexService, + final EntityCollectionManagerFactory entityCollectionManagerFactory, + final RxTaskScheduler rxTaskScheduler ) { + + this.indexService = indexService; + this.entityCollectionManagerFactory = entityCollectionManagerFactory; + this.rxTaskScheduler = rxTaskScheduler; + + final QueueScope queueScope = new QueueScopeImpl( QUEUE_NAME ); + this.queue = queueManagerFactory.getQueueManager( queueScope ); + this.indexProcessorFig = indexProcessorFig; + + this.writeTimer = metricsFactory.getTimer( SQSAsyncIndexService.class, "write" ); + this.readTimer = metricsFactory.getTimer( SQSAsyncIndexService.class, "read" ); + this.messageProcessingTimer = metricsFactory.getTimer( SQSAsyncIndexService.class, "message.processing" ); + this.indexErrorCounter = metricsFactory.getCounter( SQSAsyncIndexService.class, "error" ); + + + //wire up the gauge of inflight messages + metricsFactory.addGauge( SQSAsyncIndexService.class, "inflight.meter", new Gauge<Long>() { + @Override + public Long getValue() { + return inFlight.longValue(); + } + } ); + + start(); + } + + + /** + * Offer the EntityIdScope to SQS + */ + private void offer( final EntityIdScope operation ) { + final Timer.Context timer = this.writeTimer.time(); + + try { + //signal to SQS + this.queue.sendMessage( operation ); + } + catch ( IOException e ) { + throw new RuntimeException( "Unable to queue message", e ); + } + finally { + timer.stop(); + } + } + + + /** + * Take messages from SQS + */ + public List<QueueMessage> take() { + + //SQS doesn't support more than 10 + final Timer.Context timer = this.readTimer.time(); + + try { + return queue.getMessages( MAX_TAKE, indexProcessorFig.getIndexQueueTimeout(), indexProcessorFig.getIndexQueueTimeout(), + EntityIdScope.class ); + } + //stop our timer + finally { + timer.stop(); + } + } + + + /** + * Ack messages in SQS + */ + public void ack( final List<QueueMessage> messages ) { + + /** + * No op + */ + if ( messages.size() == 0 ) { + return; + } + + queue.commitMessages( messages ); + } + + + @Override + public void index( final EntityIdScope entityIdScope ) { + //queue the re-inex operation + offer( entityIdScope ); + } + + + @Override + public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity ) { + + //create our scope + final EntityIdScope entityIdScope = new EntityIdScope( applicationScope, entity.getId() ); + + //send it to SQS for indexing + index( entityIdScope ); + } + + + /** + * Index an entity and return an observable of the queue message on success + */ + private Observable<IndexOperationMessage> indexEntity( final QueueMessage queueMessage ) { + final EntityIdScope entityIdScope = ( EntityIdScope ) queueMessage.getBody(); + final ApplicationScope applicationScope = entityIdScope.getApplicationScope(); + final EntityCollectionManager entityCollectionManager = + entityCollectionManagerFactory.createCollectionManager( applicationScope ); + + + //run the index operation from the entity + return entityCollectionManager.load( entityIdScope.getId() ) + //invoke the indexing and take the last value + .flatMap( entity -> indexService.indexEntity( applicationScope, entity ).last() ); + } + + + /** + * Do the indexing for a list of queue messages + */ + private void doIndex( final List<QueueMessage> queueMessages ) { + //create parallel observables to process all 10 messages + final Observable<Long> observable = Observable.from( queueMessages ).flatMap( ( QueueMessage queueMessage ) -> { + return indexEntity( queueMessage ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ); + }, MAX_TAKE ).countLong() + + //remove our in flight + .doOnNext( count -> inFlight.addAndGet( -1 * count ) ) + + //do on completed ack messages. Assumes no expections were thrown + .doOnCompleted( () -> ack( queueMessages ) ); + + //wrap with our timer and fire + ObservableTimer.time( observable, messageProcessingTimer ).subscribe(); + } + + + /** + * Loop throught and start the workers + */ + public void start() { + final int count = indexProcessorFig.getWorkerCount(); + + for ( int i = 0; i < count; i++ ) { + startWorker(); + } + } + + + /** + * Stop the workers + */ + public void stop() { + synchronized ( mutex ) { + //stop consuming + + for ( final Subscription subscription : subscriptions ) { + subscription.unsubscribe(); + } + } + } + + + private void startWorker() { + synchronized ( mutex ) { + + Observable<List<QueueMessage>> consumer = + Observable.create( new Observable.OnSubscribe<List<QueueMessage>>() { + @Override + public void call( final Subscriber<? super List<QueueMessage>> subscriber ) { + + //name our thread so it's easy to see + Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() ); + + List<QueueMessage> drainList = null; + + do { + Timer.Context timer = readTimer.time(); + + try { + drainList = take(); + + //emit our list in it's entirity to hand off to a worker pool + subscriber.onNext( drainList ); + + //take since we're in flight + inFlight.addAndGet( drainList.size() ); + } + + catch ( Throwable t ) { + final long sleepTime = indexProcessorFig.getFailureRetryTime(); + + log.error( "Failed to dequeue. Sleeping for {} milliseconds", sleepTime, t ); + + if ( drainList != null ) { + inFlight.addAndGet( -1 * drainList.size() ); + } + + + try { + Thread.sleep( sleepTime ); + } + catch ( InterruptedException ie ) { + //swallow + } + + indexErrorCounter.inc(); + } + + finally{ + timer.stop(); + } + } + while ( true ); + } + } ) + //this won't block our read loop, just reads and proceeds + .doOnNext( messages -> doIndex( messages ) ).subscribeOn( Schedulers.newThread() ); + + //start in the background + + final Subscription subscription = consumer.subscribe(); + + subscriptions.add( subscription ); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncReIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncReIndexService.java deleted file mode 100644 index 60a804c..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncReIndexService.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.index; - - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; -import org.apache.usergrid.persistence.core.metrics.MetricsFactory; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.model.entity.Entity; -import org.apache.usergrid.persistence.model.util.UUIDGenerator; -import org.apache.usergrid.persistence.queue.QueueManager; -import org.apache.usergrid.persistence.queue.QueueManagerFactory; -import org.apache.usergrid.persistence.queue.QueueMessage; -import org.apache.usergrid.persistence.queue.QueueScope; -import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl; - -import com.codahale.metrics.Meter; -import com.codahale.metrics.Timer; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.smile.SmileFactory; -import com.google.inject.Inject; -import com.google.inject.Singleton; - - -@Singleton -public class SQSAsyncReIndexService implements AsyncReIndexService { - - - private static final Logger logger = LoggerFactory.getLogger( SQSAsyncReIndexService.class ); - - /** Hacky, copied from CPEntityManager b/c we can't access it here */ - public static final UUID MANAGEMENT_APPLICATION_ID = UUID.fromString( "b6768a08-b5d5-11e3-a495-11ddb1de66c8" ); - - - /** - * Set our TTL to 1 month. This is high, but in the event of a bug, we want these entries to get removed - */ - public static final int TTL = 60 * 60 * 24 * 30; - - /** - * The name to put in the map - */ - public static final String MAP_NAME = "esqueuedata"; - - - private static final String QUEUE_NAME = "es_queue"; - - private static SmileFactory SMILE_FACTORY = new SmileFactory(); - - static { - SMILE_FACTORY.delegateToTextual( true ); - } - - - private final QueueManager queue; - private final QueryFig queryFig; - private final ObjectMapper mapper; - private final Meter readMeter; - private final Timer readTimer; - private final Meter writeMeter; - private final Timer writeTimer; - - - @Inject - public SQSAsyncReIndexService( final QueueManagerFactory queueManagerFactory, final QueryFig queryFig, - final MetricsFactory metricsFactory ) { - final QueueScope queueScope = new QueueScopeImpl( QUEUE_NAME ); - - this.queue = queueManagerFactory.getQueueManager( queueScope ); - this.queryFig = queryFig; - - this.writeTimer = metricsFactory.getTimer( SQSAsyncReIndexService.class, "write.timer" ); - this.writeMeter = metricsFactory.getMeter( SQSAsyncReIndexService.class, "write.meter" ); - - this.readTimer = metricsFactory.getTimer( SQSAsyncReIndexService.class, "read.timer" ); - this.readMeter = metricsFactory.getMeter( SQSAsyncReIndexService.class, "read.meter" ); - - this.mapper = new ObjectMapper( SMILE_FACTORY ); - //pretty print, disabling for speed - // mapper.enable(SerializationFeature.INDENT_OUTPUT); - - } - - - public void offer( final IndexEntityEvent operation ) { - final Timer.Context timer = this.writeTimer.time(); - this.writeMeter.mark(); - - final UUID identifier = UUIDGenerator.newTimeUUID(); - - try { - - final String payLoad = toString( operation ); - - //signal to SQS - this.queue.sendMessage( identifier ); - } - catch ( IOException e ) { - throw new RuntimeException( "Unable to queue message", e ); - } - finally { - timer.stop(); - } - } - - - public List<IndexEntityEvent> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) { - - //SQS doesn't support more than 10 - - final int actualTake = Math.min( 10, takeSize ); - - final Timer.Context timer = this.readTimer.time(); - - try { - - List<QueueMessage> messages = queue - .getMessages( actualTake, queryFig.getIndexQueueTimeout(), ( int ) timeUnit.toMillis( timeout ), - String.class ); - - - final List<IndexEntityEvent> response = new ArrayList<>( messages.size() ); - - final List<String> mapEntries = new ArrayList<>( messages.size() ); - - - if ( messages.size() == 0 ) { - return Collections.emptyList(); - } - - //add all our keys for a single round trip - for ( final QueueMessage message : messages ) { - mapEntries.add( message.getBody().toString() ); - } - - - //load them into our response - for ( final QueueMessage message : messages ) { - - final String payload = getBody( message ); - - //now see if the key was there - - - final IndexEntityEvent messageBody; - - try { - messageBody = fromString( payload ); - } - catch ( IOException e ) { - logger.error( "Unable to deserialize message from string. This is a bug", e ); - throw new RuntimeException( "Unable to deserialize message from string. This is a bug", e ); - } - - SqsIndexOperationMessage operation = new SqsIndexOperationMessage( message, messageBody ); - - response.add( operation ); - } - - readMeter.mark( response.size() ); - return response; - } - //stop our timer - finally { - timer.stop(); - } - } - - - public void ack( final List<IndexEntityEvent> messages ) { - - //nothing to do - if ( messages.size() == 0 ) { - return; - } - - List<QueueMessage> toAck = new ArrayList<>( messages.size() ); - - for ( IndexEntityEvent ioe : messages ) { - - - final SqsIndexOperationMessage sqsIndexOperationMessage = ( SqsIndexOperationMessage ) ioe; - - toAck.add( ( ( SqsIndexOperationMessage ) ioe ).getMessage() ); - } - - queue.commitMessages( toAck ); - } - - - /** Read the object from Base64 string. */ - private IndexEntityEvent fromString( String s ) throws IOException { - IndexEntityEvent o = mapper.readValue( s, IndexEntityEvent.class ); - return o; - } - - - /** Write the object to a Base64 string. */ - private String toString( IndexEntityEvent o ) throws IOException { - return mapper.writeValueAsString( o ); - } - - - private String getBody( final QueueMessage message ) { - return message.getBody().toString(); - } - - - @Override - public void index( final EntityIdScope entityIdScope ) { - - } - - - /** - * The message that subclasses our IndexOperationMessage. holds a pointer to the original message - */ - public class SqsIndexOperationMessage extends IndexEntityEvent { - - private final QueueMessage message; - - - public SqsIndexOperationMessage( final QueueMessage message, final IndexEntityEvent source ) { - super( source.getApplicationScope(), source.getEntityId(), source.getEntityVersion() ); - this.message = message; - } - - - /** - * Get the message from our queue - */ - public QueueMessage getMessage() { - return message; - } - } - - - @Override - public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity) { - throw new UnsupportedOperationException( "Implement index rebuild" ); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java index c42ad10..364b071 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java @@ -99,8 +99,7 @@ public class CpNamingUtils { * Get the index scope for the edge from the source */ public static IndexEdge generateScopeFromSource( final Edge edge ) { - return new IndexEdgeImpl( edge.getSourceNode(), edge.getType(), SearchEdge.NodeType.SOURCE, - edge.getTimestamp() ); + return new IndexEdgeImpl( edge.getSourceNode(), edge.getType(), SearchEdge.NodeType.SOURCE, edge.getTimestamp() ); } @@ -108,8 +107,7 @@ public class CpNamingUtils { * Get the index scope for the edge from the source */ public static IndexEdge generateScopeToTarget( final Edge edge ) { - return new IndexEdgeImpl( edge.getTargetNode(), edge.getType(), SearchEdge.NodeType.TARGET, - edge.getTimestamp() ); + return new IndexEdgeImpl( edge.getTargetNode(), edge.getType(), SearchEdge.NodeType.TARGET, edge.getTimestamp() ); } @@ -123,6 +121,8 @@ public class CpNamingUtils { /** * + * TODO move sourceId to ApplicationScope + * * @param sourceId * @param collectionName * @param entityId http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java index 1ad4115..19ecf6d 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java @@ -27,6 +27,7 @@ import java.io.Serializable; import java.nio.charset.StandardCharsets; import org.apache.usergrid.persistence.collection.serialization.SerializationFig; +import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -46,6 +47,7 @@ public class SerializableMapper { static{ MAPPER.enableDefaultTypingAsProperty( ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class" ); + SMILE_FACTORY.delegateToTextual( true ); } /** @@ -66,9 +68,9 @@ public class SerializableMapper { /** * Write the value as a string + * @param <T> * @param serialized * @param clazz - * @param <T> * @return */ public static <T extends Serializable> T fromString(final String serialized, final Class<T> clazz){ http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java new file mode 100644 index 0000000..faef848 --- /dev/null +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import java.util.UUID; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.apache.usergrid.corepersistence.TestIndexModule; +import org.apache.usergrid.corepersistence.util.CpNamingUtils; +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule; +import org.apache.usergrid.persistence.core.guice.MigrationManagerRule; +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; +import org.apache.usergrid.persistence.core.test.UseModules; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.index.ApplicationEntityIndex; +import org.apache.usergrid.persistence.index.CandidateResults; +import org.apache.usergrid.persistence.index.EntityIndex; +import org.apache.usergrid.persistence.index.EntityIndexFactory; +import org.apache.usergrid.persistence.index.SearchEdge; +import org.apache.usergrid.persistence.index.SearchTypes; +import org.apache.usergrid.persistence.index.impl.EsRunner; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.persistence.model.entity.SimpleId; +import org.apache.usergrid.persistence.model.field.StringField; +import org.apache.usergrid.persistence.model.util.UUIDGenerator; +import org.apache.usergrid.persistence.queue.QueueManagerFactory; + +import com.google.inject.Inject; + +import net.jcip.annotations.NotThreadSafe; + +import rx.Observable; +import rx.schedulers.Schedulers; + +import static org.apache.usergrid.persistence.core.util.IdGenerator.createId; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + + +@RunWith( EsRunner.class ) +@UseModules( { TestIndexModule.class } ) +@NotThreadSafe +public abstract class AsyncIndexServiceTest { + + + @Inject + @Rule + public MigrationManagerRule migrationManagerRule; + + + @Inject + public EntityCollectionManagerFactory entityCollectionManagerFactory; + + @Inject + public GraphManagerFactory graphManagerFactory; + + + @Inject + public EntityIndexFactory entityIndexFactory; + + + private AsyncIndexService asyncIndexService; + + + /** + * Get the async index service + */ + protected abstract AsyncIndexService getAsyncIndexService(); + + + @Before + public void setup() { + asyncIndexService = getAsyncIndexService(); + } + + + @Test( timeout = 60000 ) + public void testMessageIndexing() throws InterruptedException { + + + ApplicationScope applicationScope = + new ApplicationScopeImpl( new SimpleId( UUID.randomUUID(), "application" ) ); + + + final Entity testEntity = new Entity( createId( "thing" ), UUIDGenerator.newTimeUUID() ); + testEntity.setField( new StringField( "string", "foo" ) ); + + + //write the entity before indexing + final EntityCollectionManager collectionManager = + entityCollectionManagerFactory.createCollectionManager( applicationScope ); + + collectionManager.write( testEntity ).toBlocking().last(); + + final GraphManager graphManager = graphManagerFactory.createEdgeManager( applicationScope ); + + //create our collection edge + final Edge collectionEdge = + CpNamingUtils.createCollectionEdge( applicationScope.getApplication(), "things", testEntity.getId() ); + graphManager.writeEdge( collectionEdge ).toBlocking().last(); + + + /** + * Write 10k edges 10 at a time in parallel + */ + final Edge connectionSearch = Observable.range( 0, 10000 ).flatMap( integer -> { + final Id connectingId = createId( "connecting" ); + final Edge edge = CpNamingUtils.createConnectionEdge( connectingId, "likes", testEntity.getId() ); + + return graphManager.writeEdge( edge ).subscribeOn( Schedulers.io() ); + }, 10 ).toBlocking().last(); + + + asyncIndexService.queueEntityIndexUpdate( applicationScope, testEntity ); + + + // Thread.sleep( 1000000000000l ); + + final ApplicationEntityIndex applicationEntityIndex = + entityIndexFactory.createApplicationEntityIndex( applicationScope ); + + final SearchEdge collectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( collectionEdge ); + + + //query until it's available + final CandidateResults collectionResults = getResults( applicationEntityIndex, collectionSearchEdge, + SearchTypes.fromTypes( testEntity.getId().getType() ), "select *", 100, 1, 100 ); + + assertEquals( 1, collectionResults.size() ); + + assertEquals( testEntity.getId(), collectionResults.get( 0 ).getId() ); + + + final SearchEdge connectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( connectionSearch ); + + + //query until it's available + final CandidateResults connectionResults = getResults( applicationEntityIndex, connectionSearchEdge, + SearchTypes.fromTypes( testEntity.getId().getType() ), "select *", 100, 1, 100 ); + + assertEquals( 1, connectionResults.size() ); + + assertEquals( testEntity.getId(), connectionResults.get( 0 ).getId() ); + } + + + private CandidateResults getResults( final ApplicationEntityIndex applicationEntityIndex, + final SearchEdge searchEdge, final SearchTypes searchTypes, final String ql, + final int count, final int expectedSize, final int attempts ) { + + + for ( int i = 0; i < attempts; i++ ) { + final CandidateResults candidateResults = + applicationEntityIndex.search( searchEdge, searchTypes, "select *", 100 ); + + if ( candidateResults.size() == expectedSize ) { + return candidateResults; + } + + try { + Thread.sleep( 100 ); + } + catch ( InterruptedException e ) { + //swallow + } + } + + fail( "Could not find candidates of size " + expectedSize + "after " + attempts + " attempts" ); + + //we'll never reach this, required for compile + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java new file mode 100644 index 0000000..43968de --- /dev/null +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import org.junit.Rule; +import org.junit.runner.RunWith; + +import org.apache.usergrid.corepersistence.TestIndexModule; +import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule; +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; +import org.apache.usergrid.persistence.core.test.UseModules; +import org.apache.usergrid.persistence.index.impl.EsRunner; +import org.apache.usergrid.persistence.queue.QueueManagerFactory; + +import com.google.inject.Inject; + +import net.jcip.annotations.NotThreadSafe; + + +@RunWith( EsRunner.class ) +@UseModules( { TestIndexModule.class } ) +@NotThreadSafe +public class InMemoryAsycIndexServiceTest extends AsyncIndexServiceTest { + + @Rule + public NoAWSCredsRule noAwsCredsRule = new NoAWSCredsRule(); + + + @Inject + public IndexService indexService; + + @Inject + public RxTaskScheduler rxTaskScheduler; + + + @Override + protected AsyncIndexService getAsyncIndexService() { + return new InMemoryAsyncIndexService( indexService, rxTaskScheduler, entityCollectionManagerFactory ); + } + + + + + +}