Repository: incubator-usergrid Updated Branches: refs/heads/two-dot-o-dev ba9e9dbc7 -> a59133bb6
Refactored the index service into an async event service. These events will affect more than just indexing. Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a59133bb Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a59133bb Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a59133bb Branch: refs/heads/two-dot-o-dev Commit: a59133bb6061454540e5506bd0bc81793055a314 Parents: ba9e9db Author: Todd Nine <tn...@apigee.com> Authored: Mon Apr 27 16:46:27 2015 -0600 Committer: Todd Nine <tn...@apigee.com> Committed: Mon Apr 27 16:46:27 2015 -0600 ---------------------------------------------------------------------- .../usergrid/corepersistence/CoreModule.java | 6 +- .../corepersistence/CpEntityManager.java | 8 +- .../corepersistence/CpEntityManagerFactory.java | 9 +- .../corepersistence/CpRelationManager.java | 10 +- .../asyncevents/AsyncEventService.java | 71 ++++ .../asyncevents/AsyncIndexProvider.java | 122 +++++++ .../asyncevents/InMemoryAsyncEventService.java | 116 ++++++ .../asyncevents/SQSAsyncEventService.java | 349 +++++++++++++++++++ .../index/AsyncIndexProvider.java | 120 ------- .../index/AsyncIndexService.java | 42 --- .../index/InMemoryAsyncIndexService.java | 95 ----- .../index/ReIndexServiceImpl.java | 5 +- .../index/SQSAsyncIndexService.java | 325 ----------------- .../index/AsyncIndexServiceTest.java | 14 +- .../index/InMemoryAsycIndexServiceTest.java | 8 +- .../index/SQSAsyncEventServiceTest.java | 84 +++++ .../index/SQSAsyncIndexServiceTest.java | 108 ------ 17 files changed, 766 insertions(+), 726 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java index ca20e23..b5544cb 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java @@ -21,8 +21,8 @@ import org.safehaus.guicyfig.GuicyFigModule; import org.apache.usergrid.corepersistence.events.EntityDeletedHandler; import org.apache.usergrid.corepersistence.events.EntityVersionCreatedHandler; import org.apache.usergrid.corepersistence.events.EntityVersionDeletedHandler; -import org.apache.usergrid.corepersistence.index.AsyncIndexProvider; -import org.apache.usergrid.corepersistence.index.AsyncIndexService; +import org.apache.usergrid.corepersistence.asyncevents.AsyncIndexProvider; +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.corepersistence.index.IndexService; import org.apache.usergrid.corepersistence.index.IndexServiceImpl; import org.apache.usergrid.corepersistence.index.IndexProcessorFig; @@ -152,7 +152,7 @@ public class CoreModule extends AbstractModule { bind( IndexService.class ).to( IndexServiceImpl.class ); //bind the queue provider - bind( AsyncIndexService.class).toProvider( AsyncIndexProvider.class ); + bind( AsyncEventService.class).toProvider( AsyncIndexProvider.class ); install( new GuicyFigModule( IndexProcessorFig.class ) ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 2d419b6..6ffefe3 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -37,7 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; -import org.apache.usergrid.corepersistence.index.AsyncIndexService; +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.corepersistence.pipeline.PipelineBuilderFactory; import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; import org.apache.usergrid.corepersistence.util.CpNamingUtils; @@ -66,7 +66,6 @@ import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl; import org.apache.usergrid.persistence.cassandra.CounterUtils; import org.apache.usergrid.persistence.cassandra.util.TraceParticipant; import org.apache.usergrid.persistence.collection.EntityCollectionManager; -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.collection.FieldSet; import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; @@ -80,7 +79,6 @@ import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsE import org.apache.usergrid.persistence.exceptions.EntityNotFoundException; import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException; import org.apache.usergrid.persistence.exceptions.UnexpectedEntityTypeException; -import org.apache.usergrid.persistence.graph.GraphManagerFactory; import org.apache.usergrid.persistence.index.query.CounterResolution; import org.apache.usergrid.persistence.index.query.Identifier; import org.apache.usergrid.persistence.map.MapManager; @@ -178,7 +176,7 @@ public class CpEntityManager implements EntityManager { private final CounterUtils counterUtils; - private final AsyncIndexService indexService; + private final AsyncEventService indexService; private PipelineBuilderFactory pipelineBuilderFactory; @@ -222,7 +220,7 @@ public class CpEntityManager implements EntityManager { * @param entityCollectionManagerFactory * @param graphManagerFactory */ - public CpEntityManager( final CassandraService cass, final CounterUtils counterUtils, final AsyncIndexService indexService, final ManagerCache managerCache, + public CpEntityManager( final CassandraService cass, final CounterUtils counterUtils, final AsyncEventService indexService, final ManagerCache managerCache, final MetricsFactory metricsFactory,final PipelineBuilderFactory pipelineBuilderFactory , final UUID applicationId ) { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index b242401..6c375ef 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -33,7 +33,7 @@ import org.springframework.context.ApplicationContextAware; import org.apache.commons.lang.StringUtils; -import org.apache.usergrid.corepersistence.index.AsyncIndexService; +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.corepersistence.index.ReIndexService; import org.apache.usergrid.corepersistence.pipeline.PipelineBuilderFactory; import org.apache.usergrid.corepersistence.util.CpNamingUtils; @@ -51,7 +51,6 @@ import org.apache.usergrid.persistence.cassandra.CassandraService; import org.apache.usergrid.persistence.cassandra.CounterUtils; import org.apache.usergrid.persistence.cassandra.Setup; import org.apache.usergrid.persistence.collection.EntityCollectionManager; -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider; @@ -63,7 +62,6 @@ import org.apache.usergrid.persistence.exceptions.ApplicationAlreadyExistsExcept import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; -import org.apache.usergrid.persistence.graph.GraphManagerFactory; import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; import org.apache.usergrid.persistence.index.ApplicationEntityIndex; @@ -75,7 +73,6 @@ import org.apache.usergrid.persistence.model.entity.SimpleId; import org.apache.usergrid.persistence.model.util.UUIDGenerator; import org.apache.usergrid.utils.UUIDUtils; -import com.amazonaws.services.elastictranscoder.model.Pipeline; import com.google.common.base.Optional; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; @@ -126,7 +123,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application private Injector injector; private final EntityIndex entityIndex; private final MetricsFactory metricsFactory; - private final AsyncIndexService indexService; + private final AsyncEventService indexService; private final PipelineBuilderFactory pipelineBuilderFactory; public CpEntityManagerFactory( final CassandraService cassandraService, final CounterUtils counterUtils, @@ -139,7 +136,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application this.entityIndexFactory = injector.getInstance(EntityIndexFactory.class); this.managerCache = injector.getInstance( ManagerCache.class ); this.metricsFactory = injector.getInstance( MetricsFactory.class ); - this.indexService = injector.getInstance( AsyncIndexService.class ); + this.indexService = injector.getInstance( AsyncEventService.class ); this.pipelineBuilderFactory = injector.getInstance( PipelineBuilderFactory.class ); this.applicationIdCache = injector.getInstance(ApplicationIdCacheFactory.class).getInstance( getManagementEntityManager() ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index e7ef0ff..aa0056c 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -30,7 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; -import org.apache.usergrid.corepersistence.index.AsyncIndexService; +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.corepersistence.pipeline.PipelineBuilderFactory; import org.apache.usergrid.corepersistence.pipeline.read.ReadPipelineBuilder; import org.apache.usergrid.corepersistence.results.ObservableQueryExecutor; @@ -50,14 +50,12 @@ import org.apache.usergrid.persistence.Schema; import org.apache.usergrid.persistence.SimpleEntityRef; import org.apache.usergrid.persistence.SimpleRoleRef; import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl; -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.entities.Group; import org.apache.usergrid.persistence.entities.User; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; -import org.apache.usergrid.persistence.graph.GraphManagerFactory; import org.apache.usergrid.persistence.graph.SearchByEdge; import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleEdge; @@ -68,7 +66,6 @@ import org.apache.usergrid.persistence.index.ApplicationEntityIndex; import org.apache.usergrid.persistence.index.EntityIndexBatch; import org.apache.usergrid.persistence.index.IndexEdge; import org.apache.usergrid.persistence.index.SearchEdge; -import org.apache.usergrid.persistence.index.SearchTypes; import org.apache.usergrid.persistence.index.query.Identifier; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.SimpleId; @@ -85,7 +82,6 @@ import rx.functions.Func1; import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createCollectionEdge; import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createCollectionSearchEdge; import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionSearchByEdge; -import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionSearchEdge; import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createSearchEdgeFromSource; import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource; import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getNameFromEdgeType; @@ -121,14 +117,14 @@ public class CpRelationManager implements RelationManager { private final ApplicationScope applicationScope; - private final AsyncIndexService indexService; + private final AsyncEventService indexService; private MetricsFactory metricsFactory; private Timer updateCollectionTimer; public CpRelationManager( final MetricsFactory metricsFactory, final ManagerCache managerCache, - final PipelineBuilderFactory pipelineBuilderFactory, final AsyncIndexService indexService, + final PipelineBuilderFactory pipelineBuilderFactory, final AsyncEventService indexService, final EntityManager em, final UUID applicationId, final EntityRef headEntity ) { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java new file mode 100644 index 0000000..9fbed39 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.asyncevents; + + +import org.apache.usergrid.corepersistence.index.ReIndexAction; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + + +/** + * Low level queue service for events in the entity. These events are fire and forget, and will always be asynchronous + */ +public interface AsyncEventService extends ReIndexAction { + + + /** + * Queue an entity to be indexed. This will start processing immediately. For implementations that are realtime (akka, in memory) + * We will return a distributed future. For SQS impls, this will return immediately, and the result will not be available. + * After SQS is removed, the tests should be enhanced to ensure that we're processing our queues correctly. + * @param applicationScope + * @param entity The entity to index. Should be fired when an entity is updated + */ + void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity); + + + /** + * Fired when a new edge is added to an entity. Such as initial entity creation, adding to a collection, or creating a connection + * + * @param applicationScope + * @param entity + * @param newEdge + */ + void queueNewEdge(final ApplicationScope applicationScope, final Entity entity, final Edge newEdge); + + /** + * Queue the deletion of an edge + * @param applicationScope + * @param edge + */ + void queueDeleteEdge(final ApplicationScope applicationScope, final Edge edge); + + /** + * The entity has been deleted, queue it's cleanup + * @param applicationScope + * @param entityId + */ + void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId); + + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java new file mode 100644 index 0000000..9f801b4 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.asyncevents; + + +import org.apache.usergrid.corepersistence.index.IndexProcessorFig; +import org.apache.usergrid.corepersistence.index.IndexService; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.queue.QueueManagerFactory; + +import com.google.inject.Inject; +import com.google.inject.Provider; +import com.google.inject.Singleton; + + +/** + * A provider to allow users to configure their queue impl via properties + */ +@Singleton +public class AsyncIndexProvider implements Provider<AsyncEventService> { + + private final IndexProcessorFig indexProcessorFig; + + private final QueueManagerFactory queueManagerFactory; + private final MetricsFactory metricsFactory; + private final IndexService indexService; + private final RxTaskScheduler rxTaskScheduler; + private final EntityCollectionManagerFactory entityCollectionManagerFactory; + + private AsyncEventService asyncEventService; + + + @Inject + public AsyncIndexProvider( final IndexProcessorFig indexProcessorFig, final QueueManagerFactory queueManagerFactory, + final MetricsFactory metricsFactory, final IndexService indexService, + final RxTaskScheduler rxTaskScheduler, + final EntityCollectionManagerFactory entityCollectionManagerFactory ) { + this.indexProcessorFig = indexProcessorFig; + this.queueManagerFactory = queueManagerFactory; + this.metricsFactory = metricsFactory; + this.indexService = indexService; + this.rxTaskScheduler = rxTaskScheduler; + this.entityCollectionManagerFactory = entityCollectionManagerFactory; + } + + + @Override + @Singleton + public AsyncEventService get() { + if ( asyncEventService == null ) { + asyncEventService = getIndexService(); + } + + + return asyncEventService; + } + + + private AsyncEventService getIndexService() { + final String value = indexProcessorFig.getQueueImplementation(); + + final Implementations impl = Implementations.valueOf( value ); + + switch ( impl ) { + case LOCAL: + return new InMemoryAsyncEventService( indexService, rxTaskScheduler, + entityCollectionManagerFactory, indexProcessorFig.resolveSynchronously()); + case SQS: + return new SQSAsyncEventService( queueManagerFactory, indexProcessorFig, metricsFactory, indexService, + entityCollectionManagerFactory, rxTaskScheduler ); + default: + throw new IllegalArgumentException( "Configuration value of " + getErrorValues() + " are allowed" ); + } + } + + + private String getErrorValues() { + String values = ""; + + for ( final Implementations impl : Implementations.values() ) { + values += impl + ", "; + } + + values = values.substring( 0, values.length() - 2 ); + + return values; + } + + + /** + * Different implementations + */ + public static enum Implementations { + TEST, + LOCAL, + SQS; + + + public String asString() { + return toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java new file mode 100644 index 0000000..2d842fb --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.asyncevents; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.corepersistence.index.IndexService; +import org.apache.usergrid.exception.NotImplementedException; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; +import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.inject.Inject; +import com.google.inject.Singleton; + +import rx.Observable; + + +@Singleton +public class InMemoryAsyncEventService implements AsyncEventService { + + private static final Logger log = LoggerFactory.getLogger( InMemoryAsyncEventService.class ); + + private final IndexService indexService; + private final RxTaskScheduler rxTaskScheduler; + private final EntityCollectionManagerFactory entityCollectionManagerFactory; + private final boolean resolveSynchronously; + + + @Inject + public InMemoryAsyncEventService( final IndexService indexService, final RxTaskScheduler rxTaskScheduler, + final EntityCollectionManagerFactory entityCollectionManagerFactory, + boolean resolveSynchronously ) { + this.indexService = indexService; + this.rxTaskScheduler = rxTaskScheduler; + this.entityCollectionManagerFactory = entityCollectionManagerFactory; + this.resolveSynchronously = resolveSynchronously; + } + + + @Override + public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity) { + + //process the entity immediately + //only process the same version, otherwise ignore + + + log.debug( "Indexing entity {} in app scope {} ", entity, applicationScope ); + + final Observable<IndexOperationMessage> edgeObservable = indexService.indexEntity( applicationScope, entity ); + + //start it in the background on an i/o thread + if(!resolveSynchronously){ + edgeObservable.subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ); + }else { + edgeObservable.toBlocking().last(); + } + } + + + @Override + public void queueNewEdge( final ApplicationScope applicationScope, final Entity entity, final Edge newEdge ) { + throw new NotImplementedException( "Implement me" ); + } + + + @Override + public void queueDeleteEdge( final ApplicationScope applicationScope, final Edge edge ) { + throw new NotImplementedException( "Implement me" ); + } + + + @Override + public void queueEntityDelete( final ApplicationScope applicationScope, final Id entityId ) { + throw new NotImplementedException( "Implement me" ); + } + + + @Override + public void index( final EntityIdScope entityIdScope ) { + + final ApplicationScope applicationScope = entityIdScope.getApplicationScope(); + + final Id entityId = entityIdScope.getId(); + + //load the entity + entityCollectionManagerFactory.createCollectionManager( applicationScope ).load( entityId ) + //perform indexing on the task scheduler and start it + .flatMap( entity -> indexService.indexEntity( applicationScope, entity ) ) + .subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java new file mode 100644 index 0000000..415e5e8 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.asyncevents; + + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.corepersistence.index.IndexProcessorFig; +import org.apache.usergrid.corepersistence.index.IndexService; +import org.apache.usergrid.exception.NotImplementedException; +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.core.metrics.ObservableTimer; +import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.persistence.queue.QueueManager; +import org.apache.usergrid.persistence.queue.QueueManagerFactory; +import org.apache.usergrid.persistence.queue.QueueMessage; +import org.apache.usergrid.persistence.queue.QueueScope; +import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Timer; +import com.google.inject.Inject; +import com.google.inject.Singleton; + +import rx.Observable; +import rx.Subscriber; +import rx.Subscription; +import rx.schedulers.Schedulers; + + +@Singleton +public class SQSAsyncEventService implements AsyncEventService { + + + private static final Logger log = LoggerFactory.getLogger( SQSAsyncEventService.class ); + + /** + * Set our TTL to 1 month. This is high, but in the event of a bug, we want these entries to get removed + */ + public static final int TTL = 60 * 60 * 24 * 30; + + + private static final int MAX_TAKE = 10; + + private static final String QUEUE_NAME = "es_queue"; + + private final QueueManager queue; + private final IndexProcessorFig indexProcessorFig; + private final IndexService indexService; + private final EntityCollectionManagerFactory entityCollectionManagerFactory; + private final RxTaskScheduler rxTaskScheduler; + + private final Timer readTimer; + private final Timer writeTimer; + private final Timer messageProcessingTimer; + + private final Object mutex = new Object(); + + + private final Counter indexErrorCounter; + private final AtomicLong counter = new AtomicLong(); + private final AtomicLong inFlight = new AtomicLong(); + + //the actively running subscription + private List<Subscription> subscriptions = new ArrayList<>(); + + + @Inject + public SQSAsyncEventService( final QueueManagerFactory queueManagerFactory, + final IndexProcessorFig indexProcessorFig, final MetricsFactory metricsFactory, + final IndexService indexService, + final EntityCollectionManagerFactory entityCollectionManagerFactory, + final RxTaskScheduler rxTaskScheduler ) { + + this.indexService = indexService; + this.entityCollectionManagerFactory = entityCollectionManagerFactory; + this.rxTaskScheduler = rxTaskScheduler; + + final QueueScope queueScope = new QueueScopeImpl( QUEUE_NAME ); + this.queue = queueManagerFactory.getQueueManager( queueScope ); + this.indexProcessorFig = indexProcessorFig; + + this.writeTimer = metricsFactory.getTimer( SQSAsyncEventService.class, "write" ); + this.readTimer = metricsFactory.getTimer( SQSAsyncEventService.class, "read" ); + this.messageProcessingTimer = metricsFactory.getTimer( SQSAsyncEventService.class, "message.processing" ); + this.indexErrorCounter = metricsFactory.getCounter( SQSAsyncEventService.class, "error" ); + + + //wire up the gauge of inflight messages + metricsFactory.addGauge( SQSAsyncEventService.class, "inflight.meter", new Gauge<Long>() { + @Override + public Long getValue() { + return inFlight.longValue(); + } + } ); + + start(); + } + + + /** + * Offer the EntityIdScope to SQS + */ + private void offer( final EntityIdScope operation ) { + final Timer.Context timer = this.writeTimer.time(); + + try { + //signal to SQS + this.queue.sendMessage( operation ); + } + catch ( IOException e ) { + throw new RuntimeException( "Unable to queue message", e ); + } + finally { + timer.stop(); + } + } + + + /** + * Take messages from SQS + */ + public List<QueueMessage> take() { + + //SQS doesn't support more than 10 + final Timer.Context timer = this.readTimer.time(); + + try { + return queue.getMessages( MAX_TAKE, indexProcessorFig.getIndexQueueTimeout(), indexProcessorFig.getIndexQueueTimeout(), + EntityIdScope.class ); + } + //stop our timer + finally { + timer.stop(); + } + } + + + /** + * Ack messages in SQS + */ + public void ack( final List<QueueMessage> messages ) { + + /** + * No op + */ + if ( messages.size() == 0 ) { + return; + } + + queue.commitMessages( messages ); + } + + + @Override + public void index( final EntityIdScope entityIdScope ) { + //queue the re-inex operation + offer( entityIdScope ); + } + + + @Override + public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity ) { + + //create our scope + final EntityIdScope entityIdScope = new EntityIdScope( applicationScope, entity.getId() ); + + //send it to SQS for indexing + index( entityIdScope ); + } + + + @Override + public void queueNewEdge( final ApplicationScope applicationScope, final Entity entity, final Edge newEdge ) { + throw new NotImplementedException( "Implement me" ); + } + + + @Override + public void queueDeleteEdge( final ApplicationScope applicationScope, final Edge edge ) { + throw new NotImplementedException( "Implement me" ); + } + + + @Override + public void queueEntityDelete( final ApplicationScope applicationScope, final Id entityId ) { + throw new NotImplementedException( "Implement me" ); + } + + + /** + * Index an entity and return an observable of the queue message on success + */ + private Observable<IndexOperationMessage> indexEntity( final QueueMessage queueMessage ) { + final EntityIdScope entityIdScope = ( EntityIdScope ) queueMessage.getBody(); + final ApplicationScope applicationScope = entityIdScope.getApplicationScope(); + final EntityCollectionManager entityCollectionManager = + entityCollectionManagerFactory.createCollectionManager( applicationScope ); + + + //run the index operation from the entity + return entityCollectionManager.load( entityIdScope.getId() ) + //invoke the indexing and take the last value + .flatMap( entity -> indexService.indexEntity( applicationScope, entity ).last() ); + } + + + /** + * Do the indexing for a list of queue messages + */ + private void doIndex( final List<QueueMessage> queueMessages ) { + //create parallel observables to process all 10 messages + final Observable<Long> observable = Observable.from( queueMessages ).flatMap( ( QueueMessage queueMessage ) -> { + return indexEntity( queueMessage ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ); + }, MAX_TAKE ).countLong() + + //remove our in flight + .doOnNext( count -> inFlight.addAndGet( -1 * count ) ) + + //do on completed ack messages. Assumes no expections were thrown + .doOnCompleted( () -> ack( queueMessages ) ); + + //wrap with our timer and fire + ObservableTimer.time( observable, messageProcessingTimer ).subscribe(); + } + + + /** + * Loop throught and start the workers + */ + public void start() { + final int count = indexProcessorFig.getWorkerCount(); + + for ( int i = 0; i < count; i++ ) { + startWorker(); + } + } + + + /** + * Stop the workers + */ + public void stop() { + synchronized ( mutex ) { + //stop consuming + + for ( final Subscription subscription : subscriptions ) { + subscription.unsubscribe(); + } + } + } + + + private void startWorker() { + synchronized ( mutex ) { + + Observable<List<QueueMessage>> consumer = + Observable.create( new Observable.OnSubscribe<List<QueueMessage>>() { + @Override + public void call( final Subscriber<? super List<QueueMessage>> subscriber ) { + + //name our thread so it's easy to see + Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() ); + + List<QueueMessage> drainList = null; + + do { + Timer.Context timer = readTimer.time(); + + try { + drainList = take(); + + //emit our list in it's entirity to hand off to a worker pool + subscriber.onNext( drainList ); + + //take since we're in flight + inFlight.addAndGet( drainList.size() ); + } + + catch ( Throwable t ) { + final long sleepTime = indexProcessorFig.getFailureRetryTime(); + + log.error( "Failed to dequeue. Sleeping for {} milliseconds", sleepTime, t ); + + if ( drainList != null ) { + inFlight.addAndGet( -1 * drainList.size() ); + } + + + try { + Thread.sleep( sleepTime ); + } + catch ( InterruptedException ie ) { + //swallow + } + + indexErrorCounter.inc(); + } + + finally{ + timer.stop(); + } + } + while ( true ); + } + } ) + //this won't block our read loop, just reads and proceeds + .doOnNext( messages -> doIndex( messages ) ).subscribeOn( Schedulers.newThread() ); + + //start in the background + + final Subscription subscription = consumer.subscribe(); + + subscriptions.add( subscription ); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java deleted file mode 100644 index c4f34de..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.index; - - -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; -import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; -import org.apache.usergrid.persistence.core.metrics.MetricsFactory; -import org.apache.usergrid.persistence.queue.QueueManagerFactory; - -import com.google.inject.Inject; -import com.google.inject.Provider; -import com.google.inject.Singleton; - - -/** - * A provider to allow users to configure their queue impl via properties - */ -@Singleton -public class AsyncIndexProvider implements Provider<AsyncIndexService> { - - private final IndexProcessorFig indexProcessorFig; - - private final QueueManagerFactory queueManagerFactory; - private final MetricsFactory metricsFactory; - private final IndexService indexService; - private final RxTaskScheduler rxTaskScheduler; - private final EntityCollectionManagerFactory entityCollectionManagerFactory; - - private AsyncIndexService asyncIndexService; - - - @Inject - public AsyncIndexProvider( final IndexProcessorFig indexProcessorFig, final QueueManagerFactory queueManagerFactory, - final MetricsFactory metricsFactory, final IndexService indexService, - final RxTaskScheduler rxTaskScheduler, - final EntityCollectionManagerFactory entityCollectionManagerFactory ) { - this.indexProcessorFig = indexProcessorFig; - this.queueManagerFactory = queueManagerFactory; - this.metricsFactory = metricsFactory; - this.indexService = indexService; - this.rxTaskScheduler = rxTaskScheduler; - this.entityCollectionManagerFactory = entityCollectionManagerFactory; - } - - - @Override - @Singleton - public AsyncIndexService get() { - if ( asyncIndexService == null ) { - asyncIndexService = getIndexService(); - } - - - return asyncIndexService; - } - - - private AsyncIndexService getIndexService() { - final String value = indexProcessorFig.getQueueImplementation(); - - final Implementations impl = Implementations.valueOf( value ); - - switch ( impl ) { - case LOCAL: - return new InMemoryAsyncIndexService( indexService, rxTaskScheduler, - entityCollectionManagerFactory, indexProcessorFig.resolveSynchronously()); - case SQS: - return new SQSAsyncIndexService( queueManagerFactory, indexProcessorFig, metricsFactory, indexService, - entityCollectionManagerFactory, rxTaskScheduler ); - default: - throw new IllegalArgumentException( "Configuration value of " + getErrorValues() + " are allowed" ); - } - } - - - private String getErrorValues() { - String values = ""; - - for ( final Implementations impl : Implementations.values() ) { - values += impl + ", "; - } - - values = values.substring( 0, values.length() - 2 ); - - return values; - } - - - /** - * Different implementations - */ - public static enum Implementations { - TEST, - LOCAL, - SQS; - - - public String asString() { - return toString(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java deleted file mode 100644 index dac92c8..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.index; - - -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.model.entity.Entity; - - -/** - * Low level queue service for indexing entities - */ -public interface AsyncIndexService extends ReIndexAction { - - - /** - * Queue an entity to be indexed. This will start processing immediately. For implementations that are realtime (akka, in memory) - * We will return a distributed future. For SQS impls, this will return immediately, and the result will not be available. - * After SQS is removed, the tests should be enhanced to ensure that we're processing our queues correctly. - * @param applicationScope - * @param entity The entity to index - */ - void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity); - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java deleted file mode 100644 index 3f59b0c..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.index; - - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; -import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; -import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; -import org.apache.usergrid.persistence.model.entity.Entity; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.inject.Inject; -import com.google.inject.Singleton; - -import rx.Observable; -import rx.Observer; - - -@Singleton -public class InMemoryAsyncIndexService implements AsyncIndexService { - - private static final Logger log = LoggerFactory.getLogger( InMemoryAsyncIndexService.class ); - - private final IndexService indexService; - private final RxTaskScheduler rxTaskScheduler; - private final EntityCollectionManagerFactory entityCollectionManagerFactory; - private final boolean resolveSynchronously; - - - @Inject - public InMemoryAsyncIndexService( final IndexService indexService, final RxTaskScheduler rxTaskScheduler, - final EntityCollectionManagerFactory entityCollectionManagerFactory, boolean resolveSynchronously ) { - this.indexService = indexService; - this.rxTaskScheduler = rxTaskScheduler; - this.entityCollectionManagerFactory = entityCollectionManagerFactory; - this.resolveSynchronously = resolveSynchronously; - } - - - @Override - public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity) { - - //process the entity immediately - //only process the same version, otherwise ignore - - - log.debug( "Indexing entity {} in app scope {} ", entity, applicationScope ); - - final Observable<IndexOperationMessage> edgeObservable = indexService.indexEntity( applicationScope, entity ); - - //start it in the background on an i/o thread - if(!resolveSynchronously){ - edgeObservable.subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ); - }else { - edgeObservable.toBlocking().last(); - } - } - - - @Override - public void index( final EntityIdScope entityIdScope ) { - - final ApplicationScope applicationScope = entityIdScope.getApplicationScope(); - - final Id entityId = entityIdScope.getId(); - - //load the entity - entityCollectionManagerFactory.createCollectionManager( applicationScope ).load( entityId ) - //perform indexing on the task scheduler and start it - .flatMap( entity -> indexService.indexEntity( applicationScope, entity ) ) - .subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java index 5bf7957..be5bcab 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java @@ -23,6 +23,7 @@ package org.apache.usergrid.corepersistence.index; import java.util.UUID; import java.util.concurrent.TimeUnit; +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservable; import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable; import org.apache.usergrid.corepersistence.rx.impl.EdgeScope; @@ -63,14 +64,14 @@ public class ReIndexServiceImpl implements ReIndexService { private final IndexProcessorFig indexProcessorFig; private final RxTaskScheduler rxTaskScheduler; private final MapManager mapManager; - private final AsyncIndexService indexService; + private final AsyncEventService indexService; @Inject public ReIndexServiceImpl( final AllEntityIdsObservable allEntityIdsObservable, final MapManagerFactory mapManagerFactory, final AllApplicationsObservable allApplicationsObservable, final IndexProcessorFig indexProcessorFig, - final RxTaskScheduler rxTaskScheduler, final AsyncIndexService indexService ) { + final RxTaskScheduler rxTaskScheduler, final AsyncEventService indexService ) { this.allEntityIdsObservable = allEntityIdsObservable; this.allApplicationsObservable = allApplicationsObservable; this.indexProcessorFig = indexProcessorFig; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java deleted file mode 100644 index d060123..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java +++ /dev/null @@ -1,325 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.index; - - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.persistence.collection.EntityCollectionManager; -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; -import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; -import org.apache.usergrid.persistence.core.metrics.MetricsFactory; -import org.apache.usergrid.persistence.core.metrics.ObservableTimer; -import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; -import org.apache.usergrid.persistence.model.entity.Entity; -import org.apache.usergrid.persistence.queue.QueueManager; -import org.apache.usergrid.persistence.queue.QueueManagerFactory; -import org.apache.usergrid.persistence.queue.QueueMessage; -import org.apache.usergrid.persistence.queue.QueueScope; -import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl; - -import com.codahale.metrics.Counter; -import com.codahale.metrics.Gauge; -import com.codahale.metrics.Timer; -import com.google.inject.Inject; -import com.google.inject.Singleton; - -import rx.Observable; -import rx.Subscriber; -import rx.Subscription; -import rx.schedulers.Schedulers; - - -@Singleton -public class SQSAsyncIndexService implements AsyncIndexService { - - - private static final Logger log = LoggerFactory.getLogger( SQSAsyncIndexService.class ); - - /** - * Set our TTL to 1 month. This is high, but in the event of a bug, we want these entries to get removed - */ - public static final int TTL = 60 * 60 * 24 * 30; - - - private static final int MAX_TAKE = 10; - - private static final String QUEUE_NAME = "es_queue"; - - private final QueueManager queue; - private final IndexProcessorFig indexProcessorFig; - private final IndexService indexService; - private final EntityCollectionManagerFactory entityCollectionManagerFactory; - private final RxTaskScheduler rxTaskScheduler; - - private final Timer readTimer; - private final Timer writeTimer; - private final Timer messageProcessingTimer; - - private final Object mutex = new Object(); - - - private final Counter indexErrorCounter; - private final AtomicLong counter = new AtomicLong(); - private final AtomicLong inFlight = new AtomicLong(); - - //the actively running subscription - private List<Subscription> subscriptions = new ArrayList<>(); - - - @Inject - public SQSAsyncIndexService( final QueueManagerFactory queueManagerFactory, final IndexProcessorFig indexProcessorFig, - final MetricsFactory metricsFactory, final IndexService indexService, - final EntityCollectionManagerFactory entityCollectionManagerFactory, - final RxTaskScheduler rxTaskScheduler ) { - - this.indexService = indexService; - this.entityCollectionManagerFactory = entityCollectionManagerFactory; - this.rxTaskScheduler = rxTaskScheduler; - - final QueueScope queueScope = new QueueScopeImpl( QUEUE_NAME ); - this.queue = queueManagerFactory.getQueueManager( queueScope ); - this.indexProcessorFig = indexProcessorFig; - - this.writeTimer = metricsFactory.getTimer( SQSAsyncIndexService.class, "write" ); - this.readTimer = metricsFactory.getTimer( SQSAsyncIndexService.class, "read" ); - this.messageProcessingTimer = metricsFactory.getTimer( SQSAsyncIndexService.class, "message.processing" ); - this.indexErrorCounter = metricsFactory.getCounter( SQSAsyncIndexService.class, "error" ); - - - //wire up the gauge of inflight messages - metricsFactory.addGauge( SQSAsyncIndexService.class, "inflight.meter", new Gauge<Long>() { - @Override - public Long getValue() { - return inFlight.longValue(); - } - } ); - - start(); - } - - - /** - * Offer the EntityIdScope to SQS - */ - private void offer( final EntityIdScope operation ) { - final Timer.Context timer = this.writeTimer.time(); - - try { - //signal to SQS - this.queue.sendMessage( operation ); - } - catch ( IOException e ) { - throw new RuntimeException( "Unable to queue message", e ); - } - finally { - timer.stop(); - } - } - - - /** - * Take messages from SQS - */ - public List<QueueMessage> take() { - - //SQS doesn't support more than 10 - final Timer.Context timer = this.readTimer.time(); - - try { - return queue.getMessages( MAX_TAKE, indexProcessorFig.getIndexQueueTimeout(), indexProcessorFig.getIndexQueueTimeout(), - EntityIdScope.class ); - } - //stop our timer - finally { - timer.stop(); - } - } - - - /** - * Ack messages in SQS - */ - public void ack( final List<QueueMessage> messages ) { - - /** - * No op - */ - if ( messages.size() == 0 ) { - return; - } - - queue.commitMessages( messages ); - } - - - @Override - public void index( final EntityIdScope entityIdScope ) { - //queue the re-inex operation - offer( entityIdScope ); - } - - - @Override - public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity ) { - - //create our scope - final EntityIdScope entityIdScope = new EntityIdScope( applicationScope, entity.getId() ); - - //send it to SQS for indexing - index( entityIdScope ); - } - - - /** - * Index an entity and return an observable of the queue message on success - */ - private Observable<IndexOperationMessage> indexEntity( final QueueMessage queueMessage ) { - final EntityIdScope entityIdScope = ( EntityIdScope ) queueMessage.getBody(); - final ApplicationScope applicationScope = entityIdScope.getApplicationScope(); - final EntityCollectionManager entityCollectionManager = - entityCollectionManagerFactory.createCollectionManager( applicationScope ); - - - //run the index operation from the entity - return entityCollectionManager.load( entityIdScope.getId() ) - //invoke the indexing and take the last value - .flatMap( entity -> indexService.indexEntity( applicationScope, entity ).last() ); - } - - - /** - * Do the indexing for a list of queue messages - */ - private void doIndex( final List<QueueMessage> queueMessages ) { - //create parallel observables to process all 10 messages - final Observable<Long> observable = Observable.from( queueMessages ).flatMap( ( QueueMessage queueMessage ) -> { - return indexEntity( queueMessage ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ); - }, MAX_TAKE ).countLong() - - //remove our in flight - .doOnNext( count -> inFlight.addAndGet( -1 * count ) ) - - //do on completed ack messages. Assumes no expections were thrown - .doOnCompleted( () -> ack( queueMessages ) ); - - //wrap with our timer and fire - ObservableTimer.time( observable, messageProcessingTimer ).subscribe(); - } - - - /** - * Loop throught and start the workers - */ - public void start() { - final int count = indexProcessorFig.getWorkerCount(); - - for ( int i = 0; i < count; i++ ) { - startWorker(); - } - } - - - /** - * Stop the workers - */ - public void stop() { - synchronized ( mutex ) { - //stop consuming - - for ( final Subscription subscription : subscriptions ) { - subscription.unsubscribe(); - } - } - } - - - private void startWorker() { - synchronized ( mutex ) { - - Observable<List<QueueMessage>> consumer = - Observable.create( new Observable.OnSubscribe<List<QueueMessage>>() { - @Override - public void call( final Subscriber<? super List<QueueMessage>> subscriber ) { - - //name our thread so it's easy to see - Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() ); - - List<QueueMessage> drainList = null; - - do { - Timer.Context timer = readTimer.time(); - - try { - drainList = take(); - - //emit our list in it's entirity to hand off to a worker pool - subscriber.onNext( drainList ); - - //take since we're in flight - inFlight.addAndGet( drainList.size() ); - } - - catch ( Throwable t ) { - final long sleepTime = indexProcessorFig.getFailureRetryTime(); - - log.error( "Failed to dequeue. Sleeping for {} milliseconds", sleepTime, t ); - - if ( drainList != null ) { - inFlight.addAndGet( -1 * drainList.size() ); - } - - - try { - Thread.sleep( sleepTime ); - } - catch ( InterruptedException ie ) { - //swallow - } - - indexErrorCounter.inc(); - } - - finally{ - timer.stop(); - } - } - while ( true ); - } - } ) - //this won't block our read loop, just reads and proceeds - .doOnNext( messages -> doIndex( messages ) ).subscribeOn( Schedulers.newThread() ); - - //start in the background - - final Subscription subscription = consumer.subscribe(); - - subscriptions.add( subscription ); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java index faef848..8dfa971 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java @@ -28,13 +28,11 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.apache.usergrid.corepersistence.TestIndexModule; +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; -import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule; import org.apache.usergrid.persistence.core.guice.MigrationManagerRule; -import org.apache.usergrid.persistence.core.metrics.MetricsFactory; -import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; import org.apache.usergrid.persistence.core.test.UseModules; @@ -43,7 +41,6 @@ import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.GraphManagerFactory; import org.apache.usergrid.persistence.index.ApplicationEntityIndex; import org.apache.usergrid.persistence.index.CandidateResults; -import org.apache.usergrid.persistence.index.EntityIndex; import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.SearchEdge; import org.apache.usergrid.persistence.index.SearchTypes; @@ -53,7 +50,6 @@ import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.SimpleId; import org.apache.usergrid.persistence.model.field.StringField; import org.apache.usergrid.persistence.model.util.UUIDGenerator; -import org.apache.usergrid.persistence.queue.QueueManagerFactory; import com.google.inject.Inject; @@ -89,18 +85,18 @@ public abstract class AsyncIndexServiceTest { public EntityIndexFactory entityIndexFactory; - private AsyncIndexService asyncIndexService; + private AsyncEventService asyncEventService; /** * Get the async index service */ - protected abstract AsyncIndexService getAsyncIndexService(); + protected abstract AsyncEventService getAsyncEventService(); @Before public void setup() { - asyncIndexService = getAsyncIndexService(); + asyncEventService = getAsyncEventService(); } @@ -141,7 +137,7 @@ public abstract class AsyncIndexServiceTest { }, 10 ).toBlocking().last(); - asyncIndexService.queueEntityIndexUpdate( applicationScope, testEntity ); + asyncEventService.queueEntityIndexUpdate( applicationScope, testEntity ); // Thread.sleep( 1000000000000l ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java index e3c59c0..2860c89 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java @@ -24,12 +24,12 @@ import org.junit.Rule; import org.junit.runner.RunWith; import org.apache.usergrid.corepersistence.TestIndexModule; +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; +import org.apache.usergrid.corepersistence.asyncevents.InMemoryAsyncEventService; import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule; -import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.test.UseModules; import org.apache.usergrid.persistence.index.impl.EsRunner; -import org.apache.usergrid.persistence.queue.QueueManagerFactory; import com.google.inject.Inject; @@ -53,8 +53,8 @@ public class InMemoryAsycIndexServiceTest extends AsyncIndexServiceTest { @Override - protected AsyncIndexService getAsyncIndexService() { - return new InMemoryAsyncIndexService( indexService, rxTaskScheduler, entityCollectionManagerFactory,false ); + protected AsyncEventService getAsyncEventService() { + return new InMemoryAsyncEventService( indexService, rxTaskScheduler, entityCollectionManagerFactory,false ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncEventServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncEventServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncEventServiceTest.java new file mode 100644 index 0000000..dff88cb --- /dev/null +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncEventServiceTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import org.junit.Rule; +import org.junit.runner.RunWith; + +import org.apache.usergrid.corepersistence.TestIndexModule; +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; +import org.apache.usergrid.corepersistence.asyncevents.SQSAsyncEventService; +import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule; +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; +import org.apache.usergrid.persistence.core.test.UseModules; +import org.apache.usergrid.persistence.index.impl.EsRunner; +import org.apache.usergrid.persistence.queue.QueueManagerFactory; + +import com.google.inject.Inject; + +import net.jcip.annotations.NotThreadSafe; + +import static org.apache.usergrid.persistence.core.util.IdGenerator.createId; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + + +@RunWith( EsRunner.class ) +@UseModules( { TestIndexModule.class } ) +@NotThreadSafe +public class SQSAsyncEventServiceTest extends AsyncIndexServiceTest { + + + + @Rule + public NoAWSCredsRule noAwsCredsRule = new NoAWSCredsRule(); + + + + @Inject + public QueueManagerFactory queueManagerFactory; + + @Inject + public IndexProcessorFig indexProcessorFig; + + + @Inject + public MetricsFactory metricsFactory; + + @Inject + public IndexService indexService; + + @Inject + public RxTaskScheduler rxTaskScheduler; + + + @Override + protected AsyncEventService getAsyncEventService() { + return new SQSAsyncEventService( queueManagerFactory, indexProcessorFig, metricsFactory, indexService, + entityCollectionManagerFactory, rxTaskScheduler ); + } + + + + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexServiceTest.java deleted file mode 100644 index aa95ae8..0000000 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexServiceTest.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.index; - - -import java.util.UUID; - -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; - -import org.apache.usergrid.corepersistence.TestIndexModule; -import org.apache.usergrid.corepersistence.util.CpNamingUtils; -import org.apache.usergrid.persistence.collection.EntityCollectionManager; -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; -import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule; -import org.apache.usergrid.persistence.core.guice.MigrationManagerRule; -import org.apache.usergrid.persistence.core.metrics.MetricsFactory; -import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; -import org.apache.usergrid.persistence.core.test.UseModules; -import org.apache.usergrid.persistence.graph.Edge; -import org.apache.usergrid.persistence.graph.GraphManager; -import org.apache.usergrid.persistence.graph.GraphManagerFactory; -import org.apache.usergrid.persistence.index.ApplicationEntityIndex; -import org.apache.usergrid.persistence.index.CandidateResults; -import org.apache.usergrid.persistence.index.EntityIndexFactory; -import org.apache.usergrid.persistence.index.SearchEdge; -import org.apache.usergrid.persistence.index.SearchTypes; -import org.apache.usergrid.persistence.index.impl.EsRunner; -import org.apache.usergrid.persistence.model.entity.Entity; -import org.apache.usergrid.persistence.model.entity.Id; -import org.apache.usergrid.persistence.model.entity.SimpleId; -import org.apache.usergrid.persistence.model.field.StringField; -import org.apache.usergrid.persistence.model.util.UUIDGenerator; -import org.apache.usergrid.persistence.queue.QueueManagerFactory; - -import com.google.inject.Inject; - -import net.jcip.annotations.NotThreadSafe; - -import rx.Observable; -import rx.schedulers.Schedulers; - -import static org.apache.usergrid.persistence.core.util.IdGenerator.createId; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - - -@RunWith( EsRunner.class ) -@UseModules( { TestIndexModule.class } ) -@NotThreadSafe -public class SQSAsyncIndexServiceTest extends AsyncIndexServiceTest { - - - - @Rule - public NoAWSCredsRule noAwsCredsRule = new NoAWSCredsRule(); - - - - @Inject - public QueueManagerFactory queueManagerFactory; - - @Inject - public IndexProcessorFig indexProcessorFig; - - - @Inject - public MetricsFactory metricsFactory; - - @Inject - public IndexService indexService; - - @Inject - public RxTaskScheduler rxTaskScheduler; - - - @Override - protected AsyncIndexService getAsyncIndexService() { - return new SQSAsyncIndexService( queueManagerFactory, indexProcessorFig, metricsFactory, indexService, - entityCollectionManagerFactory, rxTaskScheduler ); - } - - - - - -}