Introduce a graph iterator for segmenting push notifications.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/32ab5da0 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/32ab5da0 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/32ab5da0 Branch: refs/heads/asf-site Commit: 32ab5da0af66d2dfc4ed9fdb8ddf84a6b0231c03 Parents: f064c49 Author: Michael Russo <[email protected]> Authored: Sat Apr 16 00:08:32 2016 +0200 Committer: Michael Russo <[email protected]> Committed: Sat Apr 16 00:08:32 2016 +0200 ---------------------------------------------------------------------- .../persistence/NotificationGraphIterator.java | 119 +++++++++++++++++++ .../apache/usergrid/persistence/PathQuery.java | 35 ++++-- .../persistence/entities/Notification.java | 62 +++++++++- .../notifications/NotificationDeviceFilter.java | 45 +++++++ .../notifications/NotificationsService.java | 6 + .../impl/ApplicationQueueManagerImpl.java | 101 ++++++++++++++-- 6 files changed, 352 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/32ab5da0/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java new file mode 100644 index 0000000..b83f555 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.usergrid.persistence; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; + +public class NotificationGraphIterator implements ResultsIterator, Iterable { + + private static final Logger logger = LoggerFactory.getLogger(NotificationGraphIterator.class); + + + EntityManager entityManager; + + private Iterator<EntityRef> source; + private Query query; + private Iterator currentIterator; + + + public NotificationGraphIterator(EntityManager entityManager, + Iterator<EntityRef> source, + Query query) { + + this.entityManager = entityManager; + this.source = source; + this.query = query; + + } + + @Override + public Iterator iterator() { + return this; + } + + @Override + public boolean hasNext() { + if (source == null) { + return false; + } + if (currentIterator != null && currentIterator.hasNext()) { + return true; + } + while (source.hasNext()) { + EntityRef ref = source.next(); + Results r = getResultsFor(ref); + if (r.size() > 0) { + currentIterator = new PagingResultsIterator(r, query.getResultsLevel()); + return currentIterator.hasNext(); + } + } + currentIterator = null; + source = null; + return false; + } + + + @Override + public Object next() { + + + + + return (currentIterator != null) ? currentIterator.next() : null; + } + + @Override + public boolean hasPages() { + return currentIterator != null && currentIterator instanceof ResultsIterator && ((ResultsIterator) currentIterator).hasPages(); + } + + + private Results getResultsFor(EntityRef ref) { + + try { + + if (query.getCollection() != null) { + + if(logger.isTraceEnabled()) { + logger.trace("Fetching with refType: {}, collection: {} with no query", + ref.getType(), query.getCollection()); + } + return entityManager.searchCollection(ref, query.getCollection(), null); + + } else { + + if(logger.isTraceEnabled()) { + logger.trace("Searching target entities with refType: {} for collection: {} with no query", + ref.getType(), query.getCollection()); + } + + query.setQl("select *"); + return entityManager.searchTargetEntities(ref, query); + + } + + + } catch (Exception e) { + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/32ab5da0/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java b/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java index 55839a6..215f6ac 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java @@ -22,7 +22,6 @@ import java.util.Iterator; import java.util.UUID; import org.apache.usergrid.persistence.Query.Level; -import org.apache.usergrid.persistence.index.query.Identifier; import org.apache.usergrid.utils.InflectionUtils; @@ -52,11 +51,10 @@ public class PathQuery<E> { /** * top level - * - * @param head the top-level entity + * @param head the top-level entity * @param query the query - must have a collection or connectType value set */ - public PathQuery( EntityRef head, Query query ) { + public PathQuery(EntityRef head, Query query) { if ( query.getCollection() == null && query.getConnectionType() == null ) { throw new IllegalArgumentException( "Query must have a collection or connectionType value" ); } @@ -93,7 +91,7 @@ public class PathQuery<E> { return new PagingResultsIterator( getHeadResults( em ), query.getResultsLevel() ); } else { - return new MultiQueryIterator( em, source.refIterator( em ), query ); + return new MultiQueryIterator( em, source.refIterator( em, false), query ); } } catch ( Exception e ) { @@ -101,6 +99,24 @@ public class PathQuery<E> { } } + public Iterator<E> graphIterator( EntityManager em ) { + try { + + if ( uuid != null && type != null ) { + return new PagingResultsIterator( getHeadResults( em ), query.getResultsLevel() ); + }else { + + return new NotificationGraphIterator(em, source.refIterator(em, true), query); + } + + } + catch ( Exception e ) { + throw new RuntimeException( e ); + } + } + + + protected Results getHeadResults( EntityManager em ) throws Exception { @@ -123,7 +139,7 @@ public class PathQuery<E> { } - protected Iterator refIterator( EntityManager em ) throws Exception { + protected Iterator refIterator(EntityManager em, boolean useGraph) throws Exception { if ( query.getQl() == null && query.getSingleNameOrEmailIdentifier() != null){ @@ -140,7 +156,12 @@ public class PathQuery<E> { q = new Query( q ); q.setResultsLevel( Level.REFS ); } - return new MultiQueryIterator( em, source.refIterator( em ), q ); + if( useGraph){ + return new NotificationGraphIterator( em, source.refIterator( em, true), q ); + }else{ + return new MultiQueryIterator( em, source.refIterator( em, false ), q ); + + } } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/32ab5da0/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java index 6a6e3fa..aca10cf 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java @@ -26,6 +26,8 @@ import org.apache.usergrid.persistence.*; import org.apache.usergrid.persistence.annotations.EntityCollection; import org.apache.usergrid.persistence.annotations.EntityProperty; import org.apache.usergrid.persistence.index.query.Identifier; +import org.apache.usergrid.utils.InflectionUtils; + /** * The entity class for representing Notifications. @@ -77,6 +79,10 @@ public class Notification extends TypedEntity { @EntityProperty protected Long expire; + /** Stores the number of devices processed */ + @EntityProperty + protected int deviceProcessedCount; + /** True if notification is canceled */ @EntityProperty protected Boolean canceled; @@ -89,6 +95,10 @@ public class Notification extends TypedEntity { @EntityProperty protected String priority; + /** Flag to signal Usergrid to use graph traversal + filtering to find devices */ + @EntityProperty + protected boolean useGraph; + /** Error messages that may have been encountered by Usergrid when trying to process the notification */ @EntityProperty protected String errorMessage; @@ -104,6 +114,9 @@ public class Notification extends TypedEntity { @EntityProperty protected Map<String, Long> statistics; + @EntityProperty + protected Map<String, Object> filters; + public Notification() { pathQuery = new PathTokens(); @@ -173,6 +186,15 @@ public class Notification extends TypedEntity { } @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) + public int getDeviceProcessedCount() { + return deviceProcessedCount; + } + + public void setDeviceProcessedCount(int deviceProcessedCount) { + this.deviceProcessedCount = deviceProcessedCount; + } + + @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) public Boolean getCanceled() { return canceled; } @@ -191,6 +213,15 @@ public class Notification extends TypedEntity { } @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) + public boolean getUseGraph() { + return useGraph; + } + + public void setUseGraph(boolean useGraph) { + this.useGraph = useGraph; + } + + @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) public boolean getDebug() { return debug; } @@ -252,6 +283,15 @@ public class Notification extends TypedEntity { this.statistics = statistics; } + @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) + public Map<String, Object> getFilters() { + return filters; + } + + public void setFilters(Map<String, Object> filters) { + this.filters = filters; + } + public void updateStatistics(long sent, long errors) { if (this.statistics == null) { this.statistics = new HashMap<String, Long>(2); @@ -341,7 +381,8 @@ public class Notification extends TypedEntity { @JsonIgnore public PathQuery<Device> buildPathQuery() { PathQuery pathQuery = null; - for (PathToken pathToken : getPathTokens()) { + List<PathToken> pathTokens = getPathTokens(); + for (PathToken pathToken : pathTokens) { String collection = pathToken.getCollection(); Query query = new Query(); if(pathToken.getQl() != null){ @@ -365,6 +406,25 @@ public class Notification extends TypedEntity { if (pathQuery == null) { pathQuery = new PathQuery(getApplicationRef(), query); + + if ( pathTokens.size() == 1 && collection.equals(InflectionUtils.pluralize(Group.ENTITY_TYPE) )){ + + Query usersQuery = new Query(); + usersQuery.setQl("select *"); + usersQuery.setCollection("users"); + usersQuery.setLimit(100); + + Query devicesQuery = new Query(); + devicesQuery.setQl("select *"); + devicesQuery.setCollection("devices"); + usersQuery.setLimit(100); + + + // build up the chain so the proper iterators can be used later + pathQuery = pathQuery.chain( usersQuery ).chain( devicesQuery ); + + } + } else { pathQuery = pathQuery.chain(query); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/32ab5da0/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationDeviceFilter.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationDeviceFilter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationDeviceFilter.java new file mode 100644 index 0000000..35700ea --- /dev/null +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationDeviceFilter.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.usergrid.services.notifications; + + +public class NotificationDeviceFilter { + + + private String key; + private Object value; + + public NotificationDeviceFilter(String key, Object value){ + + this.key = key; + this.value = value; + + } + + public Object getValue(){ + + return this.value; + + } + + public String getKey(){ + + return this.key; + } + + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/32ab5da0/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java index 05c1cd7..50eb883 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java @@ -19,6 +19,7 @@ package org.apache.usergrid.services.notifications; import java.util.*; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,6 +85,7 @@ public class NotificationsService extends AbstractCollectionService { private ServiceManagerFactory smf; private EntityManagerFactory emf; private QueueManagerFactory queueManagerFactory; + private EntityCollectionManagerFactory ecmf; public NotificationsService() { if (logger.isTraceEnabled()) { @@ -139,9 +141,13 @@ public class NotificationsService extends AbstractCollectionService { try { validate(null, context.getPayload()); Notification.PathTokens pathTokens = getPathTokens(context.getRequest().getOriginalParameters()); + // default saving of receipts + context.getProperties().put("filters", context.getProperties().getOrDefault("filters", new HashMap<>())); + context.getProperties().put("useGraph", context.getProperties().getOrDefault("useGraph", false)); context.getProperties().put("saveReceipts", context.getProperties().getOrDefault("saveReceipts", true)); context.getProperties().put("processingFinished", 0L); // defaulting processing finished to 0 + context.getProperties().put("deviceProcessedCount", 0); // defaulting processing finished to 0 context.getProperties().put("state", Notification.State.CREATED); context.getProperties().put("pathQuery", pathTokens); context.setOwner(sm.getApplication()); http://git-wip-us.apache.org/repos/asf/usergrid/blob/32ab5da0/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java index 2466164..487ea1f 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java @@ -19,7 +19,10 @@ package org.apache.usergrid.services.notifications.impl; import com.codahale.metrics.Meter; import org.apache.usergrid.batch.JobExecution; import org.apache.usergrid.persistence.*; +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; import org.apache.usergrid.persistence.entities.*; import org.apache.usergrid.persistence.Query; import org.apache.usergrid.persistence.queue.QueueManager; @@ -97,14 +100,21 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { final AtomicInteger deviceCount = new AtomicInteger(); //count devices so you can make a judgement on batching final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<>(); //build up list of issues - //get devices in querystring, and make sure you have access if (pathQuery != null) { final HashMap<Object, ProviderAdapter> notifierMap = getAdapterMap(); if (logger.isTraceEnabled()) { logger.trace("notification {} start query", notification.getUuid()); } - final Iterator<Device> iterator = pathQuery.iterator(em); + + + // the main iterator can use graph traversal or index querying + final Iterator<Device> iterator; + if( notification.getUseGraph()){ + iterator = pathQuery.graphIterator(em); + }else{ + iterator = pathQuery.iterator(em); + } //if there are more pages (defined by PAGE_SIZE) you probably want this to be async, also if this is already a job then don't reschedule if (iterator instanceof ResultsIterator && ((ResultsIterator) iterator).hasPages() && jobExecution == null) { @@ -167,6 +177,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { }; + final Map<String, Object> filters = notification.getFilters(); Observable processMessagesObservable = Observable.create(new IteratorObservable<Entity>(iterator)) .flatMap(entity -> { @@ -180,10 +191,73 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { }) .distinct(ref -> ref.getUuid()) + .flatMap( entityRef -> { + + return Observable.just(entityRef).flatMap( ref -> { + + if(logger.isTraceEnabled()){ + logger.trace("Loading device: {}", ref.getUuid()); + + } + try { + return Observable.just(em.get(ref, Device.class)); + } + catch (Exception e){ + + return Observable.empty(); + + } + + }).subscribeOn(Schedulers.io()); + + + }, 50) + .filter( device -> { + + if(logger.isTraceEnabled()) { + logger.trace("Filtering device: {}", device.getUuid()); + } + + + if(notification.getUseGraph() && filters.size() > 0 ) { + + for (Map.Entry<String, Object> entry : filters.entrySet()) { + + if ((device.getDynamicProperties().get(entry.getKey()) != null && + device.getDynamicProperties().get(entry.getKey()).equals(entry.getValue())) || + + (device.getProperties().get(entry.getKey()) != null && + device.getProperties().get(entry.getKey()).equals(entry.getValue())) + + ) { + + + return true; + } + + } + + if(logger.isTraceEnabled()) { + logger.trace("Push notification filter matched for notification {}, so removing from notification", + device.getUuid(), notification.getUuid()); + } + return false; + + + } + + return true; + + }) .map(sendMessageFunction) .doOnNext( message -> { try { + if(message.isPresent()){ + + if(logger.isTraceEnabled()) { + logger.trace("Queueing notification message for device: {}", message.get().getDeviceId()); + } qm.sendMessage( message.get() ); queueMeter.mark(); } @@ -206,7 +280,10 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { try { notification.setProcessingFinished(System.currentTimeMillis()); + notification.setDeviceProcessedCount(deviceCount.get()); em.update(notification); + logger.info("{} devices processed for notification {}", deviceCount.get(), notification.getUuid()); + } catch (Exception e) { logger.error("Unable to set processing finished timestamp for notification"); } @@ -569,9 +646,9 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { while( initial || resultSize >= LIMIT){ initial = false; + final List<EntityRef> myusers = em.getCollection(ref, "users", start, LIMIT, Query.Level.REFS, true).getRefs(); - resultSize = myusers.size(); if(myusers.size() > 0){ @@ -579,13 +656,21 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { } - // don't allow a single user to have more than 100 devices? - for (EntityRef user : myusers) { + Observable.from(myusers).flatMap( user -> { + + try { + devices.addAll(em.getCollection(user, "devices", null, 100, + Query.Level.REFS, true).getRefs()); + }catch (Exception e){ + logger.error ("Unable to fetch devices for user: {}", user.getUuid()); + } + return Observable.from(Collections.singletonList(user)); + + }, 50).toBlocking().lastOrDefault(null); + + - devices.addAll( em.getCollection(user, "devices", null, 100, - Query.Level.REFS, true).getRefs() ); - } }
