Refactored observables Refactored re-index
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/59455748 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/59455748 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/59455748 Branch: refs/heads/USERGRID-578 Commit: 59455748c0978212396b4bf8f53bc410af81f922 Parents: 078666e Author: Todd Nine <tn...@apigee.com> Authored: Mon Apr 20 18:27:19 2015 -0600 Committer: Todd Nine <tn...@apigee.com> Committed: Mon Apr 20 18:27:19 2015 -0600 ---------------------------------------------------------------------- .../AllApplicationsObservable.java | 35 ------ .../usergrid/corepersistence/CoreModule.java | 1 + .../events/EntityVersionDeletedHandler.java | 4 +- .../index/AsyncIndexProvider.java | 17 ++- .../index/AsyncIndexService.java | 5 +- .../index/InMemoryAsyncIndexService.java | 39 ++++++- .../corepersistence/index/QueryFig.java | 5 + .../corepersistence/index/ReIndexService.java | 96 ++++++++++++++++ .../index/ReIndexServiceImpl.java | 114 +++++++++++++++++++ .../index/SQSAsyncIndexService.java | 9 +- .../rx/impl/AbstractGraphVisitorImpl.java | 60 ++-------- .../rx/impl/AllApplicationsObservable.java | 43 +++++++ .../rx/impl/AllApplicationsObservableImpl.java | 5 +- .../rx/impl/AllEntitiesInSystemImpl.java | 14 +-- .../rx/impl/AllEntityIdsObservable.java | 51 +++++++++ .../rx/impl/AllEntityIdsObservableImpl.java | 87 ++++++++++++++ .../rx/impl/AllNodesInGraphImpl.java | 11 +- .../corepersistence/rx/impl/EdgeScope.java | 51 +++++++++ .../corepersistence/util/CpNamingUtils.java | 15 +-- .../util/SerializableMapper.java | 89 +++++++++++++++ .../rx/ApplicationObservableTestIT.java | 3 +- .../rx/EdgesToTargetObservableIT.java | 8 +- .../core/rx/RxTaskSchedulerImpl.java | 9 +- .../core/scope/ApplicationScope.java | 4 +- .../persistence/core/util/StringUtils.java | 34 ++++++ .../graph/serialization/EdgesObservable.java | 16 ++- .../serialization/impl/EdgesObservableImpl.java | 23 +++- .../impl/TargetIdObservableImpl.java | 2 +- .../impl/migration/EdgeDataMigrationImpl.java | 6 +- .../persistence/index/CandidateResults.java | 2 +- .../persistence/index/utils/StringUtils.java | 62 ---------- .../usergrid/rest/management/AdminUsersIT.java | 4 +- 32 files changed, 715 insertions(+), 209 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/core/src/main/java/org/apache/usergrid/corepersistence/AllApplicationsObservable.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/AllApplicationsObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/AllApplicationsObservable.java deleted file mode 100644 index 24f32f3..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/AllApplicationsObservable.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence; - - -import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.entities.Application; - -import rx.Observable; - - -/** - * Interface for generating an observable of all ApplicationScope - */ -public interface AllApplicationsObservable extends MigrationDataProvider<ApplicationScope>{ - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java index 6ebff53..f49e066 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java @@ -31,6 +31,7 @@ import org.apache.usergrid.corepersistence.migration.CoreMigration; import org.apache.usergrid.corepersistence.migration.CoreMigrationPlugin; import org.apache.usergrid.corepersistence.migration.EntityTypeMappingMigration; import org.apache.usergrid.corepersistence.migration.MigrationModuleVersionPlugin; +import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservable; import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservableImpl; import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemImpl; import org.apache.usergrid.corepersistence.rx.impl.AllNodesInGraphImpl; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java index 22f599e..7a93606 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java @@ -23,8 +23,6 @@ import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.usergrid.corepersistence.CpEntityManagerFactory; -import org.apache.usergrid.persistence.EntityManagerFactory; import org.apache.usergrid.persistence.collection.MvccLogEntry; import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted; import org.apache.usergrid.persistence.collection.serialization.SerializationFig; @@ -103,7 +101,7 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted { //Remove all double indexes - final Observable<IndexEdge> sourceScopes = edgesObservable.edgesFromSource( gm, entityId ).map( + final Observable<IndexEdge> sourceScopes = edgesObservable.edgesFromSourceAscending( gm, entityId ).map( edge -> generateScopeToTarget( edge ) ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java index 77b4990..d00ef8e 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java @@ -20,6 +20,8 @@ package org.apache.usergrid.corepersistence.index; +import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.queue.QueueManagerFactory; @@ -41,19 +43,25 @@ public class AsyncIndexProvider implements Provider<AsyncIndexService> { private final MetricsFactory metricsFactory; private final IndexService indexService; private final RxTaskScheduler rxTaskScheduler; + private final AllEntityIdsObservable allEntitiesObservable; + private final EntityCollectionManagerFactory entityCollectionManagerFactory; private AsyncIndexService asyncIndexService; @Inject - public AsyncIndexProvider( final QueryFig queryFig, final QueueManagerFactory queueManagerFactory, final - MetricsFactory metricsFactory, - final IndexService indexService, final RxTaskScheduler rxTaskScheduler ) { + public AsyncIndexProvider( final QueryFig queryFig, final QueueManagerFactory queueManagerFactory, + final MetricsFactory metricsFactory, final IndexService indexService, + final RxTaskScheduler rxTaskScheduler, + final AllEntityIdsObservable allEntitiesObservable, + final EntityCollectionManagerFactory entityCollectionManagerFactory ) { this.queryFig = queryFig; this.queueManagerFactory = queueManagerFactory; this.metricsFactory = metricsFactory; this.indexService = indexService; this.rxTaskScheduler = rxTaskScheduler; + this.allEntitiesObservable = allEntitiesObservable; + this.entityCollectionManagerFactory = entityCollectionManagerFactory; } @@ -76,7 +84,8 @@ public class AsyncIndexProvider implements Provider<AsyncIndexService> { switch ( impl ) { case LOCAL: - return new InMemoryAsyncIndexService( indexService, rxTaskScheduler ); + return new InMemoryAsyncIndexService( indexService, rxTaskScheduler, + entityCollectionManagerFactory ); case SQS: return new SQSAsyncIndexService( queueManagerFactory, queryFig, metricsFactory ); default: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java index 06310ae..8b5ced1 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java @@ -26,13 +26,15 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; +import com.google.common.base.Optional; + import rx.Observable; /** * Low level queue service for indexing entities */ -public interface AsyncIndexService { +public interface AsyncIndexService extends ReIndexService.IndexAction { /** @@ -43,4 +45,5 @@ public interface AsyncIndexService { * @param entity The entity to index */ void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity); + } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java index 3e2a271..0efb964 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java @@ -20,10 +20,19 @@ package org.apache.usergrid.corepersistence.index; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable; +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; +import com.google.common.base.Optional; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -33,16 +42,18 @@ import rx.Observable; @Singleton public class InMemoryAsyncIndexService implements AsyncIndexService { + private static final Logger log = LoggerFactory.getLogger(InMemoryAsyncIndexService.class); private final IndexService indexService; private final RxTaskScheduler rxTaskScheduler; + private final EntityCollectionManagerFactory entityCollectionManagerFactory; @Inject - public InMemoryAsyncIndexService( final IndexService indexService, final RxTaskScheduler rxTaskScheduler ) { + public InMemoryAsyncIndexService( final IndexService indexService, final RxTaskScheduler rxTaskScheduler, + final EntityCollectionManagerFactory entityCollectionManagerFactory ) { this.indexService = indexService; - - this.rxTaskScheduler = rxTaskScheduler; + this.entityCollectionManagerFactory = entityCollectionManagerFactory; } @@ -53,7 +64,29 @@ public class InMemoryAsyncIndexService implements AsyncIndexService { //only process the same version, otherwise ignore Observable.just( toIndex ).doOnNext( entity -> { + log.debug( "Indexing entity {} in app scope {} ", entity, applicationScope ); indexService.indexEntity( applicationScope, entity ); } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe(); } + + + + @Override + public void index( final EntityIdScope entityIdScope ) { + + final ApplicationScope applicationScope = entityIdScope.getApplicationScope(); + + final Id entityId = entityIdScope.getId(); + + final Entity + entity = entityCollectionManagerFactory.createCollectionManager( applicationScope ).load( + entityId ).toBlocking().lastOrDefault( null ); + + + if(entity == null){ + log.warn( "Could not find entity with id {} in app scope {} ", entityId, applicationScope ); + } + + indexService.indexEntity(applicationScope, entity ); + } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueryFig.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueryFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueryFig.java index a7d2450..82ed496 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueryFig.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueryFig.java @@ -95,4 +95,9 @@ public interface QueryFig extends GuicyFig { int getIndexQueueSize(); + @Default("30000") + @Key("elasticsearch.reindex.sample.interval") + long getReIndexSampleInterval(); + + } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java new file mode 100644 index 0000000..dca6cac --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import java.util.concurrent.Callable; +import java.util.concurrent.FutureTask; + +import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable; +import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; + +import com.google.common.base.Optional; + +import rx.Observable; +import rx.Observer; + + +/** + * An interface for re-indexing all entities in an application + */ +public interface ReIndexService { + + + /** + * Reindex all applications using the cursor provided + * + * @param startTimestamp The timestamp to start seeking from + * + * @return a cursor that can be used to resume the operation on the next run + */ + IndexResponse reIndex( final rx.Observable<ApplicationScope> applicationScopes, final Optional<String> cursor, + final Optional<Long> startTimestamp, final IndexAction indexAction ); + + + /** + * The response when requesting a re-index operation + */ + class IndexResponse { + final String cursor; + final Observable<Long> indexedEdgecount; + + + public IndexResponse( final String cursor, final Observable<Long> indexedEdgecount ) { + this.cursor = cursor; + this.indexedEdgecount = indexedEdgecount; + } + + + /** + * Get the cursor used to resume this operation + * @return + */ + public String getCursor() { + return cursor; + } + + + /** + * Return the observable long count of all edges indexed + * @return + */ + public Observable<Long> getCount() { + return indexedEdgecount; + } + } + + + + + /** + * Callback to perform an index operation based on an scope during bulk re-index operations + */ + @FunctionalInterface + interface IndexAction { + + void index( final EntityIdScope entityIdScope ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java new file mode 100644 index 0000000..5c022e1 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import java.util.concurrent.TimeUnit; + +import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable; +import org.apache.usergrid.corepersistence.rx.impl.EdgeScope; +import org.apache.usergrid.corepersistence.util.CpNamingUtils; +import org.apache.usergrid.corepersistence.util.SerializableMapper; +import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; +import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.core.util.StringUtils; +import org.apache.usergrid.persistence.map.MapManager; +import org.apache.usergrid.persistence.map.MapManagerFactory; +import org.apache.usergrid.persistence.map.MapScope; +import org.apache.usergrid.persistence.map.impl.MapScopeImpl; +import org.apache.usergrid.persistence.model.util.UUIDGenerator; + +import com.google.common.base.Optional; + +import rx.Observable; +import rx.observables.ConnectableObservable; + + +public class ReIndexServiceImpl implements ReIndexService { + + private static final MapScope RESUME_MAP_SCOPTE = + new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "reindexresume" ); + + //Keep cursors to resume re-index for 1 day. This is far beyond it's useful real world implications anyway. + private static final int INDEX_TTL = 60 * 60 * 24 * 10; + + + private final AllEntityIdsObservable allEntityIdsObservable; + private final QueryFig queryFig; + private final RxTaskScheduler rxTaskScheduler; + private final MapManager mapManager; + + + public ReIndexServiceImpl( final AllEntityIdsObservable allEntityIdsObservable, + final MapManagerFactory mapManagerFactory, final QueryFig queryFig, + final RxTaskScheduler rxTaskScheduler ) { + this.allEntityIdsObservable = allEntityIdsObservable; + this.queryFig = queryFig; + this.rxTaskScheduler = rxTaskScheduler; + + this.mapManager = mapManagerFactory.createMapManager( RESUME_MAP_SCOPTE ); + } + + + @Override + public IndexResponse reIndex( final Observable<ApplicationScope> applicationScopes, final Optional<String> cursor, + final Optional<Long> startTimestamp, final IndexAction indexAction ) { + + + //load our last emitted Scope if a cursor is present + if ( cursor.isPresent() ) { + throw new UnsupportedOperationException( "Build this" ); + } + + final String newCursor = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() ); + + //create an observable that loads each entity and indexes it, start it running with publish + final ConnectableObservable<EdgeScope> runningReIndex = + allEntityIdsObservable.getEdgesToEntities( applicationScopes, startTimestamp ) + + //for each edge, create our scope and index on it + .doOnNext( edge -> indexAction + .index( new EntityIdScope( edge.getApplicationScope(), edge.getEdge().getTargetNode() ) ) ) + + .subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).publish(); + + + //count our longs + final Observable<Long> indexedCount = runningReIndex.countLong(); + + + //start our sampler and state persistence + //take a sample every sample interval to allow us to resume state with minimal loss + runningReIndex.sample( queryFig.getReIndexSampleInterval(), TimeUnit.MILLISECONDS, + rxTaskScheduler.getAsyncIOScheduler() ) + .doOnNext( edge -> { + + final String serializedState = SerializableMapper.asString( edge ); + + mapManager.putString( newCursor, serializedState, INDEX_TTL ); + } ).subscribe(); + + + return new IndexResponse( newCursor, indexedCount ); + } +} + + http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java index dfcb97a..6d06637 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.model.entity.Entity; @@ -233,6 +234,12 @@ public class SQSAsyncIndexService implements AsyncIndexService { } + @Override + public void index( final EntityIdScope entityIdScope ) { + + } + + /** * The message that subclasses our IndexOperationMessage. holds a pointer to the original message */ @@ -258,6 +265,6 @@ public class SQSAsyncIndexService implements AsyncIndexService { @Override public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity) { - + throw new UnsupportedOperationException( "Implement me" ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AbstractGraphVisitorImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AbstractGraphVisitorImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AbstractGraphVisitorImpl.java index ef66481..e307233 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AbstractGraphVisitorImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AbstractGraphVisitorImpl.java @@ -20,88 +20,46 @@ package org.apache.usergrid.corepersistence.rx.impl; -import org.apache.usergrid.corepersistence.AllApplicationsObservable; +import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.graph.GraphManager; -import org.apache.usergrid.persistence.graph.GraphManagerFactory; -import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable; -import org.apache.usergrid.persistence.model.entity.Id; import com.google.inject.Inject; import rx.Observable; -import rx.functions.Func1; /** - * An observable that will emit every entity Id stored in our entire system across all apps. - * Note that this only walks each application applicationId graph, and emits edges from the applicationId and it's edges as the s - * source node + * An observable that returns all entities in the collections */ public abstract class AbstractGraphVisitorImpl<T> implements MigrationDataProvider<T> { private final AllApplicationsObservable applicationObservable; - private final GraphManagerFactory graphManagerFactory; - private final TargetIdObservable targetIdObservable; + private final AllEntityIdsObservable allEntityIdsObservable; @Inject public AbstractGraphVisitorImpl( AllApplicationsObservable applicationObservable, - GraphManagerFactory graphManagerFactory, TargetIdObservable targetIdObservable ) { + final AllEntityIdsObservable allEntityIdsObservable ) { this.applicationObservable = applicationObservable; - this.graphManagerFactory = graphManagerFactory; - this.targetIdObservable = targetIdObservable; + this.allEntityIdsObservable = allEntityIdsObservable; } @Override public Observable<T> getData() { - return applicationObservable.getData().flatMap( new Func1<ApplicationScope, Observable<T>>() { - @Override - public Observable<T> call( final ApplicationScope applicationScope ) { - return getAllEntities( applicationScope ); - } - } ); + return allEntityIdsObservable.getEntities( applicationObservable.getData() ).map( + entityIdScope -> generateData( entityIdScope ) ); } - private Observable<T> getAllEntities(final ApplicationScope applicationScope) { - final GraphManager gm = graphManagerFactory.createEdgeManager(applicationScope); - final Id applicationId = applicationScope.getApplication(); - - //load all nodes that are targets of our application node. I.E. - // entities that have been saved - final Observable<Id> entityNodes = - targetIdObservable.getTargetNodes(gm, applicationId); - - //emit Scope + ID - - //create our application node to emit since it's an entity as well - final Observable<Id> applicationNode = Observable.just(applicationId); - - //merge both the specified application node and the entity node - // so they all get used - return Observable - .merge( applicationNode, entityNodes ). - map( new Func1<Id, T>() { - @Override - public T call( final Id id ) { - return generateData(applicationScope, id); - } - } ); - } - - /** * Generate the data for the observable stream from the scope and the node id - * @param applicationScope - * @param nodeId + * @param entityIdScope * @return */ - protected abstract T generateData(final ApplicationScope applicationScope, final Id nodeId); + protected abstract T generateData(final EntityIdScope entityIdScope); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllApplicationsObservable.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllApplicationsObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllApplicationsObservable.java new file mode 100644 index 0000000..76ad8a1 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllApplicationsObservable.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.rx.impl; + + +import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; + +import com.amazonaws.services.opsworks.model.App; + +import rx.Observable; + + +/** + * Interface for generating an observable of all ApplicationScope + */ +public interface AllApplicationsObservable extends MigrationDataProvider<ApplicationScope>{ + + + /** + * Get an observable of application scopes to be used + * @return + */ + Observable<ApplicationScope> getData(); + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllApplicationsObservableImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllApplicationsObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllApplicationsObservableImpl.java index 5151857..5e2f6b6 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllApplicationsObservableImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllApplicationsObservableImpl.java @@ -23,13 +23,11 @@ package org.apache.usergrid.corepersistence.rx.impl; import java.util.Arrays; import java.util.UUID; -import org.apache.usergrid.persistence.Schema; -import org.apache.usergrid.utils.UUIDUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.usergrid.corepersistence.AllApplicationsObservable; import org.apache.usergrid.corepersistence.util.CpNamingUtils; +import org.apache.usergrid.persistence.Schema; import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.core.scope.ApplicationScope; @@ -40,6 +38,7 @@ import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.utils.UUIDUtils; import com.google.inject.Inject; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemImpl.java index 83f7d84..a315482 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemImpl.java @@ -20,7 +20,6 @@ package org.apache.usergrid.corepersistence.rx.impl; -import org.apache.usergrid.corepersistence.AllApplicationsObservable; import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.GraphManagerFactory; @@ -41,17 +40,14 @@ public class AllEntitiesInSystemImpl extends AbstractGraphVisitorImpl<EntityIdSc @Inject public AllEntitiesInSystemImpl( final AllApplicationsObservable applicationObservable, - final GraphManagerFactory graphManagerFactory, - final TargetIdObservable targetIdObservable ) { - super( applicationObservable, graphManagerFactory, targetIdObservable ); + final AllEntityIdsObservable allEntityIdsObservable) { + super( applicationObservable, allEntityIdsObservable ); } @Override - protected EntityIdScope generateData( final ApplicationScope applicationScope, final Id nodeId ) { - - final EntityIdScope idScope = new EntityIdScope( applicationScope, nodeId ); - - return idScope; + protected EntityIdScope generateData( final EntityIdScope entityIdScope ) { + return entityIdScope; } + } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java new file mode 100644 index 0000000..c805a59 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.rx.impl; + + +import com.google.common.base.Optional; + +import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; + +import rx.Observable; + + +/** + * An implementation that will provide all entityId scopes in the system + */ +public interface AllEntityIdsObservable { + + /** + * Return an observable of scopes from the given appScopes + * @param appScopes + * @return An observable of entityId scopes + */ + Observable<EntityIdScope> getEntities( final Observable<ApplicationScope> appScopes ); + + /** + * Get all edges that represent edges to entities in the system + * @param appScopes + * @param startTime The time to + * @return + */ + Observable<EdgeScope> getEdgesToEntities(final Observable<ApplicationScope> appScopes, final Optional<Long> startTime); + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java new file mode 100644 index 0000000..1f74420 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.rx.impl; + + + +import com.google.common.base.Optional; + +import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.graph.serialization.EdgesObservable; +import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable; +import org.apache.usergrid.persistence.model.entity.Id; + +import rx.Observable; + + +/** + * An implementation that will provide all entityId scopes in the system + */ +public class AllEntityIdsObservableImpl implements AllEntityIdsObservable { + private final GraphManagerFactory graphManagerFactory; + private final TargetIdObservable targetIdObservable; + private final EdgesObservable edgesObservable; + + + public AllEntityIdsObservableImpl( final GraphManagerFactory graphManagerFactory, + final TargetIdObservable targetIdObservable, + final EdgesObservable edgesObservable ) { + this.graphManagerFactory = graphManagerFactory; + this.targetIdObservable = targetIdObservable; + this.edgesObservable = edgesObservable; + } + + + @Override + public Observable<EntityIdScope> getEntities( final Observable<ApplicationScope> appScopes ) { + + return appScopes.flatMap( applicationScope -> { + final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope ); + final Id applicationId = applicationScope.getApplication(); + + //load all nodes that are targets of our application node. I.E. + // entities that have been saved + final Observable<Id> entityNodes = targetIdObservable.getTargetNodes( gm, applicationId ); + + + //create our application node to emit since it's an entity as well + final Observable<Id> applicationNode = Observable.just( applicationId ); + + //merge both the specified application node and the entity node + // so they all get used + return Observable.merge( applicationNode, entityNodes ). + map( id -> new EntityIdScope( applicationScope, id ) ); + } ); + } + + + @Override + public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes, final Optional<Long> startTime) { + + return appScopes.flatMap( applicationScope -> { + final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope ); + + return edgesObservable.edgesFromSourceAscending( gm, applicationScope.getApplication(), startTime ).map( edge -> new EdgeScope(applicationScope, edge )); + } ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllNodesInGraphImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllNodesInGraphImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllNodesInGraphImpl.java index 19292b8..f802654 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllNodesInGraphImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllNodesInGraphImpl.java @@ -20,7 +20,7 @@ package org.apache.usergrid.corepersistence.rx.impl; -import org.apache.usergrid.corepersistence.AllApplicationsObservable; +import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.GraphManagerFactory; import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable; @@ -42,14 +42,13 @@ public class AllNodesInGraphImpl extends AbstractGraphVisitorImpl<GraphNode> { @Inject public AllNodesInGraphImpl( final AllApplicationsObservable applicationObservable, - final GraphManagerFactory graphManagerFactory, - final TargetIdObservable targetIdObservable ) { - super( applicationObservable, graphManagerFactory, targetIdObservable ); + final AllEntityIdsObservable allEntityIdsObservable ) { + super( applicationObservable, allEntityIdsObservable ); } @Override - protected GraphNode generateData( final ApplicationScope applicationScope, final Id nodeId ) { - return new GraphNode( applicationScope, nodeId ); + protected GraphNode generateData( final EntityIdScope entityIdScope ) { + return new GraphNode(entityIdScope.getApplicationScope(), entityIdScope.getId()); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/EdgeScope.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/EdgeScope.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/EdgeScope.java new file mode 100644 index 0000000..c5639ce --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/EdgeScope.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.rx.impl; + + +import java.io.Serializable; + +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.graph.Edge; + + +/** + * Get the scope for the edge + */ +public final class EdgeScope implements Serializable { + private final ApplicationScope applicationScope; + private final Edge edge; + + + public EdgeScope( final ApplicationScope applicationScope, final Edge edge ) { + this.applicationScope = applicationScope; + this.edge = edge; + } + + + public ApplicationScope getApplicationScope() { + return applicationScope; + } + + + public Edge getEdge() { + return edge; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java index c60b86e..67cc0ca 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java @@ -73,13 +73,6 @@ public class CpNamingUtils { public static String TYPES_BY_UUID_MAP = "zzz_typesbyuuid_zzz"; - /** - * Create an Id object from the entity ref - */ - public static Id createId( final EntityRef entityRef ) { - return new SimpleId( entityRef.getUuid(), entityRef.getType() ); - } - /** * Generate a standard edge name for our graph using the connection name. To be used only for searching. DO NOT use @@ -229,6 +222,14 @@ public class CpNamingUtils { /** + * Generate an application scope for the management application + * @return + */ + public static Id getManagementApplicationId(){ + return generateApplicationId( MANAGEMENT_APPLICATION_ID ); + } + + /** * Get the map scope for the applicationId to store entity uuid to type mapping */ public static MapScope getEntityTypeMapScope( final Id applicationId ) { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java new file mode 100644 index 0000000..1ad4115 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.util; + + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; + +import org.apache.usergrid.persistence.collection.serialization.SerializationFig; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.smile.SmileFactory; +import com.google.common.base.Preconditions; + + +/** + * A simple utility for serializing serializable classes to/and from strings. To be used for small object storage only, such as resume on re-index + * storing data such as entities should be specialized. + */ +public class SerializableMapper { + + private static final SmileFactory SMILE_FACTORY = new SmileFactory(); + + private static final ObjectMapper MAPPER = new ObjectMapper( SMILE_FACTORY ); + + static{ + MAPPER.enableDefaultTypingAsProperty( ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class" ); + } + + /** + * Get value as a string + * @param toSerialize + * @param <T> + * @return + */ + public static <T extends Serializable> String asString(final T toSerialize){ + try { + return MAPPER.writeValueAsString( toSerialize ); + } + catch ( JsonProcessingException e ) { + throw new RuntimeException( "Unable to process json", e ); + } + } + + + /** + * Write the value as a string + * @param serialized + * @param clazz + * @param <T> + * @return + */ + public static <T extends Serializable> T fromString(final String serialized, final Class<T> clazz){ + Preconditions.checkNotNull(serialized, "serialized string cannot be null"); + + + InputStream stream = new ByteArrayInputStream(serialized.getBytes( StandardCharsets.UTF_8)); + + try { + return MAPPER.readValue( stream, clazz ); + } + catch ( IOException e ) { + throw new RuntimeException( String.format("Unable to parse string '%s'", serialized), e ); + } + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java index 353e2fa..a619e4d 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java @@ -28,8 +28,8 @@ import org.junit.Test; import org.apache.usergrid.AbstractCoreIT; import org.apache.usergrid.cassandra.SpringResource; -import org.apache.usergrid.corepersistence.AllApplicationsObservable; import org.apache.usergrid.corepersistence.ManagerCache; +import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservable; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.entities.Application; @@ -52,6 +52,7 @@ public class ApplicationObservableTestIT extends AbstractCoreIT { final Application createdApplication = app.getEntityManager().getApplication(); + AllApplicationsObservable applicationObservable =SpringResource.getInstance().getBean(Injector.class).getInstance(AllApplicationsObservable.class); //now our get all apps we expect. There may be more, but we don't care about those. http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java index 6d228b2..92f2b01 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java @@ -23,7 +23,6 @@ package org.apache.usergrid.corepersistence.rx; import java.util.HashSet; import java.util.Set; -import org.apache.usergrid.corepersistence.CpSetup; import org.apache.usergrid.persistence.graph.serialization.EdgesObservable; import org.junit.Test; @@ -35,15 +34,12 @@ import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.persistence.EntityManager; import org.apache.usergrid.persistence.SimpleEntityRef; import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.utils.EdgeTestUtils; import com.google.inject.Injector; -import rx.functions.Action1; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -93,7 +89,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT { final GraphManager gm = managerCache.getGraphManager( scope ); - edgesFromSourceObservable.edgesFromSource( gm, applicationId ).doOnNext( edge -> { + edgesFromSourceObservable.edgesFromSourceAscending( gm, applicationId ).doOnNext( edge -> { final String edgeType = edge.getType(); final Id target = edge.getTargetNode(); @@ -122,7 +118,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT { //test connections - edgesFromSourceObservable.edgesFromSource( gm, source).doOnNext( edge -> { + edgesFromSourceObservable.edgesFromSourceAscending( gm, source ).doOnNext( edge -> { final String edgeType = edge.getType(); final Id target = edge.getTargetNode(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java index 219cde6..311f7f4 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java @@ -83,8 +83,7 @@ public class RxTaskSchedulerImpl implements RxTaskScheduler { public MaxSizeThreadPool( final BlockingQueue<Runnable> queue, final int maxPoolSize ) { - super( 1, maxPoolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( ), - new RejectedHandler() ); + super( 1, maxPoolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( ), new RejectedHandler() ); } } @@ -118,11 +117,11 @@ public class RxTaskSchedulerImpl implements RxTaskScheduler { @Override public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) { - log.warn( "{} task queue full, rejecting task {}", poolName, r ); + log.warn( "{} task queue full, rejecting task {} and running in thread {}", poolName, r, Thread.currentThread().getName() ); - //TODO T.N. do we want to run this on the caller thread? + //We've decided we want to have a "caller runs" policy, to just invoke the task when rejected - throw new RejectedExecutionException( "Unable to run task, queue full" ); + r.run(); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScope.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScope.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScope.java index 920421d..5d22eff 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScope.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScope.java @@ -19,13 +19,15 @@ package org.apache.usergrid.persistence.core.scope; +import java.io.Serializable; + import org.apache.usergrid.persistence.model.entity.Id; /** * A scope used for organizations */ -public interface ApplicationScope { +public interface ApplicationScope extends Serializable { /** * Get an Application scope http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/util/StringUtils.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/util/StringUtils.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/util/StringUtils.java new file mode 100644 index 0000000..acfc2d8 --- /dev/null +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/util/StringUtils.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.usergrid.persistence.core.util; + + +import java.util.UUID; + + +public class StringUtils extends org.apache.commons.lang.StringUtils { + + + /** + * Remove dashes from our uuid + * @param uuid + * @return + */ + public static String sanitizeUUID(final UUID uuid){ + return uuid.toString().replace( "-", "" ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java index fa73991..9e7b8e6 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java @@ -22,6 +22,9 @@ package org.apache.usergrid.persistence.graph.serialization; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Optional; + import rx.Observable; /** @@ -35,7 +38,18 @@ public interface EdgesObservable { * @param sourceNode * @return */ - Observable<Edge> edgesFromSource( final GraphManager gm, final Id sourceNode); + Observable<Edge> edgesFromSourceAscending( final GraphManager gm, final Id sourceNode ); + + + /** + * Return an observable of all edges from a source node. Ordered ascending, from the startTimestamp if specified + * @param gm + * @param sourceNode + * @param startTimestamp + * @return + */ + Observable<Edge> edgesFromSourceAscending( final GraphManager gm, final Id sourceNode, + final Optional<Long> startTimestamp ); /** * Get all edges from the source node with the target type http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java index 2264cbd..859ca2e 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java @@ -32,6 +32,8 @@ import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType; import org.apache.usergrid.persistence.graph.serialization.EdgesObservable; import org.apache.usergrid.persistence.model.entity.Id; +import com.google.common.base.Optional; + import rx.Observable; import rx.functions.Func1; @@ -53,7 +55,7 @@ public class EdgesObservableImpl implements EdgesObservable { * Get all edges from the source */ @Override - public Observable<Edge> edgesFromSource( final GraphManager gm, final Id sourceNode ) { + public Observable<Edge> edgesFromSourceAscending( final GraphManager gm, final Id sourceNode ) { final Observable<String> edgeTypes = gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( sourceNode, null, null ) ); @@ -69,6 +71,25 @@ public class EdgesObservableImpl implements EdgesObservable { @Override + public Observable<Edge> edgesFromSourceAscending( final GraphManager gm, final Id sourceNode, + final Optional<Long> startTimestamp ) { + + final Observable<String> edgeTypes = + gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( sourceNode, null, null ) ); + + + return edgeTypes.flatMap( edgeType -> { + + logger.debug( "Loading edges of edgeType {} from {}", edgeType, sourceNode ); + + return gm.loadEdgesFromSource( + new SimpleSearchByEdgeType( sourceNode, edgeType, startTimestamp.or( Long.MIN_VALUE ), SearchByEdgeType.Order.ASCENDING, + null ) ); + } ); + } + + + @Override public Observable<Edge> getEdgesFromSource( final GraphManager gm, final Id sourceNode, final String targetType ) { final Observable<String> edgeTypes = http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java index 9729e63..5cf5117 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java @@ -55,7 +55,7 @@ public class TargetIdObservableImpl implements TargetIdObservable { public Observable<Id> getTargetNodes(final GraphManager gm, final Id sourceNode) { //only search edge types that start with collections - return edgesFromSourceObservable.edgesFromSource(gm, sourceNode ).map( new Func1<Edge, Id>() { + return edgesFromSourceObservable.edgesFromSourceAscending( gm, sourceNode ).map( new Func1<Edge, Id>() { @Override http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java index 2e0412f..0df26ff 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java @@ -38,8 +38,6 @@ import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSeri import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observable; -import rx.functions.Action1; -import rx.functions.Func1; import rx.schedulers.Schedulers; import java.util.List; @@ -87,9 +85,9 @@ public class EdgeDataMigrationImpl implements DataMigration<GraphNode> { final Observable<List<Edge>> observable = migrationDataProvider.getData().flatMap( graphNode -> { final GraphManager gm = graphManagerFactory.createEdgeManager( graphNode.applicationScope ); - + //get edges from the source - return edgesFromSourceObservable.edgesFromSource( gm, graphNode.entryNode ).buffer( 1000 ) + return edgesFromSourceObservable.edgesFromSourceAscending( gm, graphNode.entryNode ).buffer( 1000 ) .doOnNext( edges -> { final MutationBatch batch = keyspace.prepareMutationBatch(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java index 14130bc..e4624a9 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java @@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory; import org.apache.usergrid.persistence.model.util.UUIDGenerator; -import static org.apache.usergrid.persistence.index.utils.StringUtils.sanitizeUUID; +import static org.apache.usergrid.persistence.core.util.StringUtils.sanitizeUUID; /** http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/StringUtils.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/StringUtils.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/StringUtils.java deleted file mode 100644 index a567594..0000000 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/StringUtils.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.usergrid.persistence.index.utils; - - -import java.util.Arrays; -import java.util.UUID; - -import org.apache.commons.io.IOUtils; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.usergrid.persistence.index.utils.ConversionUtils.string; - - -public class StringUtils extends org.apache.commons.lang.StringUtils { - - private static final Logger LOG = LoggerFactory.getLogger( StringUtils.class ); - - - - public static String stringOrSubstringBeforeFirst( String str, char c ) { - if ( str == null ) { - return null; - } - int i = str.indexOf( c ); - if ( i != -1 ) { - return str.substring( 0, i ); - } - return str; - } - - - public static String toString( Object obj ) { - return string( obj ); - } - - - /** - * Remove dashes from our uuid - * @param uuid - * @return - */ - public static String sanitizeUUID(final UUID uuid){ - return uuid.toString().replace( "-", "" ); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59455748/stack/rest/src/test/java/org/apache/usergrid/rest/management/AdminUsersIT.java ---------------------------------------------------------------------- diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/management/AdminUsersIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/management/AdminUsersIT.java index 36a9400..de3a6ce 100644 --- a/stack/rest/src/test/java/org/apache/usergrid/rest/management/AdminUsersIT.java +++ b/stack/rest/src/test/java/org/apache/usergrid/rest/management/AdminUsersIT.java @@ -29,14 +29,13 @@ import javax.mail.internet.MimeMultipart; import org.junit.Before; import org.junit.Ignore; -import org.junit.Rule; import org.junit.Test; import org.jvnet.mock_javamail.Mailbox; import org.apache.usergrid.management.MockImapClient; -import org.apache.usergrid.persistence.index.utils.StringUtils; +import org.apache.usergrid.persistence.core.util.StringUtils; import org.apache.usergrid.persistence.index.utils.UUIDUtils; import org.apache.usergrid.rest.test.resource2point0.AbstractRestIT; import org.apache.usergrid.rest.test.resource2point0.endpoints.mgmt.ManagementResource; @@ -46,7 +45,6 @@ import org.apache.usergrid.rest.test.resource2point0.model.Entity; import org.apache.usergrid.rest.test.resource2point0.model.QueryParameters; import org.apache.usergrid.rest.test.resource2point0.model.Token; -import com.fasterxml.jackson.databind.JsonNode; import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.UniformInterfaceException; import com.sun.jersey.api.representation.Form;