Parallel device fetching from users, need to update to support all PN use cases with this parallelism.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/cc3cbfee Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/cc3cbfee Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/cc3cbfee Branch: refs/heads/asf-site Commit: cc3cbfee60f83e107269fe4fb558cb094b5c2032 Parents: 8cf7825 Author: Michael Russo <[email protected]> Authored: Sun Apr 17 14:34:45 2016 +0100 Committer: Michael Russo <[email protected]> Committed: Sun Apr 17 14:34:45 2016 +0100 ---------------------------------------------------------------------- .../corepersistence/CpRelationManager.java | 21 ++++ .../pipeline/builder/IdBuilder.java | 6 ++ .../pipeline/read/FilterFactory.java | 8 +- .../pipeline/read/traverse/IdFilter.java | 52 ++++++++++ .../results/IdQueryExecutor.java | 66 ++++++++++++ .../service/CollectionSearch.java | 9 ++ .../service/CollectionService.java | 5 + .../service/CollectionServiceImpl.java | 23 ++++ .../persistence/NotificationGraphIterator.java | 79 +++++++++++++- .../apache/usergrid/persistence/Results.java | 16 +++ .../persistence/entities/Notification.java | 16 ++- .../impl/ApplicationQueueManagerImpl.java | 104 +++++++++++++++---- 12 files changed, 369 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index 9ecf466..5596ab4 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -19,6 +19,7 @@ package org.apache.usergrid.corepersistence; import java.util.*; +import org.apache.usergrid.corepersistence.results.IdQueryExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; @@ -614,6 +615,24 @@ public class CpRelationManager implements RelationManager { final Optional<String> queryString = query.isGraphSearch()? Optional.<String>absent(): query.getQl(); final Id ownerId = headEntity.asId(); + + if(query.getLevel() == Level.IDS ){ + + return new IdQueryExecutor( toExecute.getCursor() ) { + @Override + protected Observable<ResultsPage<Id>> buildNewResultsPage( + final Optional<String> cursor ) { + + final CollectionSearch search = + new CollectionSearch( applicationScope, ownerId, collectionName, collection.getType(), toExecute.getLimit(), + queryString, cursor ); + + return collectionService.searchCollectionIds( search ); + } + }.next(); + + } + //wire the callback so we can get each page return new EntityQueryExecutor( toExecute.getCursor() ) { @Override @@ -989,6 +1008,8 @@ public class CpRelationManager implements RelationManager { // } // } + + return query; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java index 0f784a6..65cf7c1 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java @@ -30,6 +30,7 @@ import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefRe import org.apache.usergrid.corepersistence.pipeline.read.collect.IdResumeFilter; import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector; import org.apache.usergrid.corepersistence.pipeline.read.search.Candidate; +import org.apache.usergrid.corepersistence.pipeline.read.traverse.IdFilter; import org.apache.usergrid.persistence.ConnectionRef; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; @@ -148,4 +149,9 @@ public class IdBuilder { return new ConnectionRefBuilder(connectionRefFilter); } + public Observable<ResultsPage<Id>> build(){ + //we must add our resume filter so we drop our previous page first element if it's present + return pipeline.withFilter( new IdFilter() ).withFilter(new ResultsPageCollector<>()).execute(); + } + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java index ca5695c..883fdc8 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java @@ -27,13 +27,7 @@ import org.apache.usergrid.corepersistence.pipeline.read.search.CandidateEntityF import org.apache.usergrid.corepersistence.pipeline.read.search.CandidateIdFilter; import org.apache.usergrid.corepersistence.pipeline.read.search.SearchCollectionFilter; import org.apache.usergrid.corepersistence.pipeline.read.search.SearchConnectionFilter; -import org.apache.usergrid.corepersistence.pipeline.read.traverse.EntityIdFilter; -import org.apache.usergrid.corepersistence.pipeline.read.traverse.EntityLoadVerifyFilter; -import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphCollectionByIdFilter; -import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphCollectionFilter; -import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphConnectionByIdFilter; -import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphConnectionByTypeFilter; -import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphConnectionFilter; +import org.apache.usergrid.corepersistence.pipeline.read.traverse.*; import org.apache.usergrid.persistence.model.entity.Id; import com.google.common.base.Optional; http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/IdFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/IdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/IdFilter.java new file mode 100644 index 0000000..9b3a05a --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/IdFilter.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.traverse; + + +import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; + +import rx.Observable; + +import java.util.List; + + +/** + * This command is a stopgap to make migrating 1.0 code easier. Once full traversal has been implemented, this should + * be removed + */ +public class IdFilter extends AbstractFilter<FilterResult<Id>, FilterResult<Id>>{ + + @Inject + public IdFilter() {}; + + + + @Override + public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterValueObservable ) { + //ignore what our input was, and simply emit the id specified + return filterValueObservable.map( idFilterResult -> new FilterResult( idFilterResult.getValue(), idFilterResult.getPath() )); + + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdQueryExecutor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdQueryExecutor.java new file mode 100644 index 0000000..5a0bb3f --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdQueryExecutor.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.results; + + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; + +import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; +import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; +import org.apache.usergrid.persistence.ConnectionRef; +import org.apache.usergrid.persistence.EntityFactory; +import org.apache.usergrid.persistence.Results; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Optional; + +import rx.Observable; + + +/** + * Processes our results of connection refs + */ +@Deprecated//Required for 1.0 compatibility +public abstract class IdQueryExecutor extends ObservableQueryExecutor<Id> { + + + protected IdQueryExecutor( final Optional<String> startCursor ) { + super( startCursor ); + } + + + @Override + protected Results createResults( final ResultsPage resultsPage ) { + final List<Id> ids = resultsPage.getEntityList(); + + List<UUID> uuids = ids.stream().map(id -> id.getUuid()).collect(Collectors.toList()); + + final Results results = Results.fromIdList(uuids); + + return results; + } + + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java index ab8a8bc..602a5b6 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java @@ -29,6 +29,10 @@ import com.google.common.base.Optional; */ public class CollectionSearch { + public enum Level { + IDS, ALL + } + private final ApplicationScope applicationScope; private final Id collectionOwnerId; private final String collectionName; @@ -36,6 +40,7 @@ public class CollectionSearch { private final int limit; private final Optional<String> query; private final Optional<String> cursor; + private Level level = Level.ALL; public CollectionSearch( final ApplicationScope applicationScope, final Id collectionOwnerId, final String @@ -84,4 +89,8 @@ public class CollectionSearch { public Id getCollectionOwnerId() { return collectionOwnerId; } + + public void setResultsLevel(Level level){ this.level = level; } + + public Level getResultsLevel(){ return level; } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionService.java index eef741a..6a46022 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionService.java @@ -21,6 +21,7 @@ package org.apache.usergrid.corepersistence.service; import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; import rx.Observable; @@ -35,4 +36,8 @@ public interface CollectionService { * @return An observable with results page entries for the stream */ Observable<ResultsPage<Entity>> searchCollection(final CollectionSearch search); + + + + Observable<ResultsPage<Id>> searchCollectionIds(final CollectionSearch search); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java index fa79d09..9244315 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java @@ -29,6 +29,7 @@ import com.google.common.base.Optional; import com.google.inject.Inject; import com.google.inject.Singleton; +import org.apache.usergrid.persistence.model.entity.Id; import rx.Observable; @@ -73,4 +74,26 @@ public class CollectionServiceImpl implements CollectionService { return results.build(); } + + @Override + public Observable<ResultsPage<Id>> searchCollectionIds(final CollectionSearch search ) { + + + final ApplicationScope applicationScope = search.getApplicationScope(); + final String collectionName = search.getCollectionName(); + final Optional<String> query = search.getQuery(); + + final IdBuilder pipelineBuilder = + pipelineBuilderFactory.create( applicationScope ).withCursor( search.getCursor() ) + .withLimit( search.getLimit() ).fromId( search.getCollectionOwnerId() ); + + + final IdBuilder results; + + + results = pipelineBuilder.traverseCollection( collectionName ); + + + return results.build(); + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java index b83f555..a1f3246 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java @@ -17,10 +17,16 @@ package org.apache.usergrid.persistence; +import org.apache.usergrid.persistence.entities.Group; +import org.apache.usergrid.persistence.entities.User; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; public class NotificationGraphIterator implements ResultsIterator, Iterable { @@ -58,8 +64,19 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable { return true; } while (source.hasNext()) { - EntityRef ref = source.next(); - Results r = getResultsFor(ref); + Object next = source.next(); + Results r; + +// if(next instanceof UUID){ +// +// UUID id = (UUID) next; +// r = getResultsForId(id, "user"); +// +// }else { + EntityRef ref = (EntityRef) next; + r = getResultsFor(ref); + // } + if (r.size() > 0) { currentIterator = new PagingResultsIterator(r, query.getResultsLevel()); return currentIterator.hasNext(); @@ -90,13 +107,57 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable { try { + + query.setLimit(Query.MAX_LIMIT); // always fetch our MAX limit to reduce # of IO hops if (query.getCollection() != null) { + // make sure this results in graph traversal + query.setQl("select *"); + if(logger.isTraceEnabled()) { logger.trace("Fetching with refType: {}, collection: {} with no query", ref.getType(), query.getCollection()); } - return entityManager.searchCollection(ref, query.getCollection(), null); + + // if we're fetching devices through groups->users->devices, get only the IDs and don't load the entities + if( ref.getType().equals(Group.ENTITY_TYPE)){ + + // query users using IDs as we don't need to load the full entities just to find their devices + Query usersQuery = new Query(); + usersQuery.setCollection("users"); + usersQuery.setResultsLevel(Query.Level.IDS); + usersQuery.setLimit(1000); + + + // set the query level for the iterator temporarily to IDS + query.setResultsLevel(Query.Level.IDS); + + return entityManager.searchCollection(ref, usersQuery.getCollection(), usersQuery); + + +// List<EntityRef> refs = +// results.getIds().stream() +// .map( uuid -> new SimpleEntityRef( "user", uuid) ).collect(Collectors.toList()); +// +// // set the query level for the iterator back to REFS after mapping our IDS +// query.setResultsLevel(Query.Level.REFS); +// return Results.fromRefList(refs); + + } + + if( ref.getType().equals(User.ENTITY_TYPE)){ + + Query devicesQuery = new Query(); + devicesQuery.setCollection("devices"); + devicesQuery.setResultsLevel(Query.Level.CORE_PROPERTIES); + + //query.setCollection("devices"); + //query.setResultsLevel(Query.Level.CORE_PROPERTIES); + return entityManager.searchCollection(ref, devicesQuery.getCollection(), devicesQuery); + } + + return entityManager.searchCollection(ref, query.getCollection(), query); + } else { @@ -105,7 +166,7 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable { ref.getType(), query.getCollection()); } - query.setQl("select *"); + query.setQl("select *"); // make sure this results in graph traversal return entityManager.searchTargetEntities(ref, query); } @@ -116,4 +177,14 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable { } } + + private Results getResultsForId(UUID uuid, String type) { + + EntityRef ref = new SimpleEntityRef(type, uuid); + return getResultsFor(ref); + + + } + + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java index e9a3251..2a84622 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java @@ -708,6 +708,22 @@ public class Results implements Iterable<Entity> { } } + public void addEntities( Results results){ + + if(entities == null){ + //init(); + entities = new ArrayList<>(); + level = Level.CORE_PROPERTIES; + } + + if( results.getEntities().size() > 0){ + + entities.addAll(results.getEntities()); + + } + + } + /** Remove the passed in results from the current results */ public void subtract( Results results ) { http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java index aca10cf..d4f3529 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java @@ -409,20 +409,28 @@ public class Notification extends TypedEntity { if ( pathTokens.size() == 1 && collection.equals(InflectionUtils.pluralize(Group.ENTITY_TYPE) )){ - Query usersQuery = new Query(); + final Query usersQuery = new Query(); usersQuery.setQl("select *"); usersQuery.setCollection("users"); usersQuery.setLimit(100); - Query devicesQuery = new Query(); + final Query devicesQuery = new Query(); devicesQuery.setQl("select *"); devicesQuery.setCollection("devices"); - usersQuery.setLimit(100); + devicesQuery.setLimit(100); // build up the chain so the proper iterators can be used later - pathQuery = pathQuery.chain( usersQuery ).chain( devicesQuery ); + pathQuery = pathQuery.chain( usersQuery );//.chain( devicesQuery ); + + }else if(pathTokens.size() == 1 && collection.equals(InflectionUtils.pluralize(User.ENTITY_TYPE))){ + + final Query devicesQuery = new Query(); + devicesQuery.setQl("select *"); + devicesQuery.setCollection("devices"); + devicesQuery.setLimit(100); + pathQuery = pathQuery.chain( devicesQuery ); } } else { http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java index 5ce1b93..1cbb2c6 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java @@ -19,10 +19,7 @@ package org.apache.usergrid.services.notifications.impl; import com.codahale.metrics.Meter; import org.apache.usergrid.batch.JobExecution; import org.apache.usergrid.persistence.*; -import org.apache.usergrid.persistence.collection.EntityCollectionManager; -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; -import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; import org.apache.usergrid.persistence.entities.*; import org.apache.usergrid.persistence.Query; import org.apache.usergrid.persistence.queue.QueueManager; @@ -108,6 +105,8 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { if (logger.isTraceEnabled()) { logger.trace("notification {} start query", notification.getUuid()); } + logger.info("notification {} start query", notification.getUuid()); + // the main iterator can use graph traversal or index querying @@ -185,30 +184,80 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { final Map<String, Object> filters = notification.getFilters(); - Observable processMessagesObservable = Observable.create(new IteratorObservable<Entity>(iterator)) - .flatMap(entity -> { - if(entity.getType().equals(Device.ENTITY_TYPE)){ - return Observable.from(Collections.singletonList(entity)); - } - // if it's not a device, drill down and get them - return Observable.from(getDevices(entity)); + Observable processMessagesObservable = Observable.create(new IteratorObservable<UUID>(iterator)) +// .flatMap(entity -> { +// +// if(entity.getType().equals(Device.ENTITY_TYPE)){ +// return Observable.from(Collections.singletonList(entity)); +// } +// +// // if it's not a device, drill down and get them +// return Observable.from(getDevices(entity)); +// +// }) + .distinct() + .flatMap( entityRef -> { + + return Observable.just(entityRef).flatMap(ref->{ + + List<Entity> entities = new ArrayList<>(); + + Query devicesQuery = new Query(); + devicesQuery.setCollection("devices"); + devicesQuery.setResultsLevel(Query.Level.CORE_PROPERTIES); + + try { + + entities = em.searchCollection(new SimpleEntityRef("user", ref), devicesQuery.getCollection(), devicesQuery).getEntities(); + + }catch (Exception e){ + + logger.error("Unable to load devices for user: {}", ref); + return Observable.empty(); + } - }) - .distinct(ref -> ref.getUuid() ) - .map( entityRef -> entityRef.getUuid() ) - .buffer(10) - .flatMap( uuids -> { - if(logger.isTraceEnabled()) { - logger.trace("Processing batch of {} device(s)", uuids.size()); - } - return Observable.from(em.getEntities(uuids, "device")) +// if( ref.getType().equals(User.ENTITY_TYPE)){ +// +// Query devicesQuery = new Query(); +// devicesQuery.setCollection("devices"); +// devicesQuery.setResultsLevel(Query.Level.CORE_PROPERTIES); +// +// try { +// +// entities = em.searchCollection(new SimpleEntityRef("user", ref.getUuid()), devicesQuery.getCollection(), devicesQuery).getEntities(); +// +// }catch (Exception e){ +// +// logger.error("Unable to load devices for user: {}", ref.getUuid()); +// return Observable.empty(); +// } +// +// +// }else if ( ref.getType().equals(Device.ENTITY_TYPE)){ +// +// try{ +// entities.add(em.get(ref)); +// +// }catch(Exception e){ +// +// logger.error("Unable to load device: {}", ref.getUuid()); +// return Observable.empty(); +// +// } +// +// } + return Observable.from(entities); + + }) .filter( device -> { + logger.info("Filtering device: {}", device.getUuid()); + if(logger.isTraceEnabled()) { logger.trace("Filtering device: {}", device.getUuid()); } @@ -233,7 +282,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { } if(logger.isTraceEnabled()) { - logger.trace("Push notification filter matched for notification {}, so removing from notification", + logger.trace("Push notification filter did not match for notification {}, so removing from notification", device.getUuid(), notification.getUuid()); } return false; @@ -271,7 +320,20 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { }).subscribeOn(Schedulers.io()); - }, 10) + + }, 100) + //.map( entityRef -> entityRef.getUuid() ) + //.buffer(10) +// .flatMap( uuids -> { +// +// if(logger.isTraceEnabled()) { +// logger.trace("Processing batch of {} device(s)", uuids.size()); +// } +// +// +// return Observable.from(em.getEntities(uuids, "device")).subscribeOn(Schedulers.io()); +// +// }, 10) .doOnError(throwable -> {
