Repository: usergrid Updated Branches: refs/heads/release-2.1.1 17e9b36e5 -> 112748010
Push notification queuing to be async. Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/11274801 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/11274801 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/11274801 Branch: refs/heads/release-2.1.1 Commit: 11274801059723b4a2264e01f45141080a47511a Parents: 17e9b36 Author: Michael Russo <[email protected]> Authored: Wed Apr 13 15:51:08 2016 +0200 Committer: Michael Russo <[email protected]> Committed: Wed Apr 13 15:51:08 2016 +0200 ---------------------------------------------------------------------- .../persistence/entities/Notification.java | 10 ----- .../impl/ApplicationQueueManagerImpl.java | 45 +++++++------------- 2 files changed, 16 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/11274801/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java index 5c3ee89..dc7c989 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java @@ -41,11 +41,6 @@ public class Notification extends TypedEntity { NORMAL, HIGH } - - /** Total count of notifications sent based on the API path/query */ - @EntityProperty - protected int expectedCount; - /** The pathQuery/query that Usergrid used to idenitfy the devices to send the notification to */ @EntityProperty private PathTokens pathQuery; @@ -107,11 +102,6 @@ public class Notification extends TypedEntity { } @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) - public int getExpectedCount() { return expectedCount; } - - public void setExpectedCount(int expectedCount) { this.expectedCount = expectedCount; } - - @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) public PathTokens getPathQuery(){ return pathQuery; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/11274801/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java index fb4d64c..f819e39 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java @@ -20,10 +20,7 @@ import com.codahale.metrics.Meter; import org.apache.usergrid.batch.JobExecution; import org.apache.usergrid.persistence.*; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; -import org.apache.usergrid.persistence.entities.Device; -import org.apache.usergrid.persistence.entities.Notification; -import org.apache.usergrid.persistence.entities.Notifier; -import org.apache.usergrid.persistence.entities.Receipt; +import org.apache.usergrid.persistence.entities.*; import org.apache.usergrid.persistence.Query; import org.apache.usergrid.persistence.queue.QueueManager; import org.apache.usergrid.persistence.queue.QueueMessage; @@ -33,6 +30,7 @@ import org.slf4j.LoggerFactory; import rx.Observable; import rx.Subscriber; import rx.functions.Func1; +import rx.schedulers.Schedulers; import java.io.IOException; import java.util.*; @@ -174,15 +172,18 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { //process up to 10 concurrently Observable processMessagesObservable = Observable.create(new IteratorObservable<Entity>(iterator)) .flatMap(entity -> { + + if(entity.getType().equals(Device.ENTITY_TYPE)){ + return Observable.from(Collections.singletonList(entity)); + } + + // if it's not a device, drill down and get them return Observable.from(getDevices(entity)); - }, 10) + + }, 50) .distinct(ref -> ref.getUuid()) .map(sendMessageFunction) - .buffer(100) - .doOnNext( applicationQueueMessages -> { - - applicationQueueMessages.forEach( message -> { - + .doOnNext( message -> { try { if(message.isPresent()){ qm.sendMessage( message.get() ); @@ -201,13 +202,12 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { } - }); - }) .doOnError(throwable -> logger.error("Failed while trying to send notification", throwable)); - processMessagesObservable.toBlocking().lastOrDefault(null); // let this run and block the async thread, messages are queued + //TODO verify error handling here + processMessagesObservable.subscribeOn(Schedulers.io()).subscribe(); // fire the queuing into the background } @@ -221,7 +221,6 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { } } - notification.setExpectedCount(deviceCount.get()); notification.addProperties(properties); em.update(notification); @@ -491,14 +490,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { private boolean isOkToSend(Notification notification) { - Map<String, Long> stats = notification.getStatistics(); - if (stats != null && notification.getExpectedCount() == (stats.get("sent") + stats.get("errors"))) { - if (logger.isDebugEnabled()) { - logger.debug("notification {} already processed. not sending.", - notification.getUuid()); - } - return false; - } + if (notification.getCanceled() == Boolean.TRUE) { if (logger.isDebugEnabled()) { logger.debug("notification {} canceled. not sending.", @@ -522,14 +514,9 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { final int LIMIT = Query.MID_LIMIT; - try { - if ("device".equals(ref.getType())) { - - devices = Collections.singletonList(ref); - - } else if ("user".equals(ref.getType())) { + if (User.ENTITY_TYPE.equals(ref.getType())) { UUID start = null; boolean initial = true; @@ -551,7 +538,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { } - } else if ("group".equals(ref.getType())) { + } else if (Group.ENTITY_TYPE.equals(ref.getType())) { UUID start = null; boolean initial = true;
