First pass to allow more than 1k notifications to be sent for groups/queries targeting users and devices.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/8e4d7eef Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/8e4d7eef Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/8e4d7eef Branch: refs/heads/master Commit: 8e4d7eef2ca0967491e9ef863c78880e002575d1 Parents: 74de4bc Author: Michael Russo <[email protected]> Authored: Sun Apr 10 23:20:10 2016 +0300 Committer: Michael Russo <[email protected]> Committed: Sun Apr 10 23:20:10 2016 +0300 ---------------------------------------------------------------------- .../corepersistence/CpRelationManager.java | 34 ++++--- .../services/notifications/TaskManager.java | 7 +- .../impl/ApplicationQueueManagerImpl.java | 100 ++++++++++++++++--- 3 files changed, 113 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/8e4d7eef/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index 67e92f8..b5a4107 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -17,14 +17,7 @@ package org.apache.usergrid.corepersistence; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; +import java.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -318,14 +311,31 @@ public class CpRelationManager implements RelationManager { final String ql; - if ( startResult != null ) { - ql = "select * where created > " + startResult.timestamp(); - } - else { + + if (startResult != null ) { + + // UUID timestamp is a different measure than 'created' field on entities + Calendar uuidEpoch = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + uuidEpoch.clear(); + uuidEpoch.set(1582, 9, 15, 0, 0, 0); // 9 = October + long epochMillis = uuidEpoch.getTime().getTime(); + + long time = (startResult.timestamp() / 10000L) + epochMillis; + + if ( !reversed ) { + ql = "select * where created > " + time; + } else { + ql = "select * where created < " + time; + } + + } else { ql = "select *"; } Query query = Query.fromQL( ql ); + if(query == null ){ + throw new RuntimeException("Unable to get data for collection: "+collectionName); + } query.setLimit( count ); query.setReversed( reversed ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/8e4d7eef/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java index 3e78210..954724f 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java @@ -131,13 +131,14 @@ public class TaskManager { } if ( debug || hasError) { + + List<EntityRef> entities = Arrays.asList(notification, device); + if (receipt.getUuid() == null) { Receipt savedReceipt = em.create(receipt); - receipt.setUuid(savedReceipt.getUuid()); - List<EntityRef> entities = Arrays.asList(notification, device); em.addToCollections(entities, Notification.RECEIPTS_COLLECTION, savedReceipt); } else { - em.update(receipt); + em.addToCollections(entities, Notification.RECEIPTS_COLLECTION, receipt); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/8e4d7eef/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java index 12a47b6..04e60b7 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java @@ -34,6 +34,7 @@ import rx.Observable; import rx.Subscriber; import rx.functions.Func1; +import java.io.IOException; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @@ -119,7 +120,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { final UUID appId = em.getApplication().getUuid(); final Map<String, Object> payloads = notification.getPayloads(); - final Func1<EntityRef, EntityRef> sendMessageFunction = deviceRef -> { + final Func1<EntityRef, ApplicationQueueMessage> sendMessageFunction = deviceRef -> { try { long now = System.currentTimeMillis(); @@ -143,7 +144,8 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { } if (notifierId == null) { - return deviceRef; + //TODO need to leverage optional here + //return deviceRef; } ApplicationQueueMessage message = new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), notifierKey, notifierId); @@ -153,16 +155,19 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { notification.setQueued(System.currentTimeMillis()); } - qm.sendMessage(message); deviceCount.incrementAndGet(); - queueMeter.mark(); + + return message; } catch (Exception deviceLoopException) { logger.error("Failed to add device", deviceLoopException); errorMessages.add("Failed to add device: " + deviceRef.getUuid() + ", error:" + deviceLoopException); + + //TODO need an optional here + return new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), "test", "test"); } - return deviceRef; + }; @@ -174,9 +179,28 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { }, 10) .distinct(ref -> ref.getUuid()) .map(sendMessageFunction) + .buffer(100) + .doOnNext( applicationQueueMessages -> { + + applicationQueueMessages.forEach( message -> { + + try { + + qm.sendMessage( message ); + queueMeter.mark(); + + } catch (IOException e) { + logger.error("Unable to queue notification for notification UUID {} and device UUID {} ", + message.getNotificationId(), message.getDeviceId()); + } + + }); + + + }) .doOnError(throwable -> logger.error("Failed while trying to send notification", throwable)); - processMessagesObservable.toBlocking().lastOrDefault(null); + processMessagesObservable.toBlocking(); // let this run and block the async thread, messages are queued } @@ -487,20 +511,70 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { private List<EntityRef> getDevices(EntityRef ref) { - List<EntityRef> devices = Collections.EMPTY_LIST; + List<EntityRef> devices = new ArrayList<>(); + try { + if ("device".equals(ref.getType())) { + devices = Collections.singletonList(ref); + } else if ("user".equals(ref.getType())) { - devices = em.getCollection(ref, "devices", null, Query.MAX_LIMIT, - Query.Level.REFS, false).getRefs(); + + UUID start = null; + boolean initial = true; + int resultSize = 0; + while( initial || resultSize >= Query.DEFAULT_LIMIT) { + + initial = false; + + final List<EntityRef> mydevices = em.getCollection(ref, "devices", start, Query.DEFAULT_LIMIT, + Query.Level.REFS, true).getRefs(); + + resultSize = mydevices.size(); + if(mydevices.size() > 0){ + start = mydevices.get(mydevices.size() - 1 ).getUuid(); + } + + + devices.addAll( mydevices ); + + + } + } else if ("group".equals(ref.getType())) { - devices = new ArrayList<>(); - for (EntityRef r : em.getCollection(ref, "users", null, - Query.MAX_LIMIT, Query.Level.REFS, false).getRefs()) { - devices.addAll(getDevices(r)); + + //devices = new ArrayList<>(); + UUID start = null; + boolean initial = true; + int resultSize = 0; + + while( initial || resultSize >= Query.DEFAULT_LIMIT){ + + initial = false; + final List<EntityRef> myusers = em.getCollection(ref, "users", start, + Query.DEFAULT_LIMIT, Query.Level.REFS, true).getRefs(); + + resultSize = myusers.size(); + if(myusers.size() > 0){ + start = myusers.get(myusers.size() - 1 ).getUuid(); + } + + + // don't allow a single user to have more than 100 devices? + for (EntityRef user : myusers) { + + devices.addAll( em.getCollection(user, "devices", null, 100, + Query.Level.REFS, true).getRefs() ); + + + } + } + + + } } catch (Exception e) {
