Repository: incubator-usergrid Updated Branches: refs/heads/sqs_queues d4c90d81a -> 9ee998bc2
fixing tests Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9ee998bc Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9ee998bc Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9ee998bc Branch: refs/heads/sqs_queues Commit: 9ee998bc240992005ac32eb03a967196ee82e1ae Parents: d4c90d8 Author: Shawn Feldman <[email protected]> Authored: Mon Oct 6 10:50:37 2014 -0600 Committer: Shawn Feldman <[email protected]> Committed: Mon Oct 6 10:50:37 2014 -0600 ---------------------------------------------------------------------- .../queue/impl/SQSQueueManagerImpl.java | 2 +- .../services/notifications/QueueListener.java | 4 +- .../services/notifications/TaskManager.java | 57 ++++++++++---------- .../apns/NotificationsServiceIT.java | 20 ++++--- 4 files changed, 41 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ee998bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java index c88128b..9d480ef 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java @@ -125,7 +125,7 @@ public class SQSQueueManagerImpl implements QueueManager { for (Message message : messages) { Object body ; try{ - body = fromString( message.getBody()); + body = fromString(message.getBody()); }catch (Exception e){ LOG.error("failed to deserialize message", e); body = message.getBody(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ee998bc/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java index b0472ad..a381c70 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java @@ -67,7 +67,7 @@ public class QueueListener { private ExecutorService pool; private List<Future> futures; - public final String MAX_THREADS = "2"; + public final int MAX_THREADS = 2; private Integer batchSize = 10; private String queueName; @@ -93,7 +93,7 @@ public class QueueListener { batchSize = new Integer(properties.getProperty("usergrid.notifications.listener.batchSize", (""+batchSize))); queueName = ApplicationQueueManager.getQueueNames(properties); - int maxThreads = new Integer(properties.getProperty("usergrid.notifications.listener.maxThreads", MAX_THREADS)); + int maxThreads = new Integer(properties.getProperty("usergrid.notifications.listener.maxThreads", ""+MAX_THREADS)); futures = new ArrayList<Future>(maxThreads); //create our thread pool based on our threadcount. http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ee998bc/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java index 7c70634..dc0fb05 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java @@ -147,35 +147,36 @@ public class TaskManager { } } - public void finishedBatch() throws Exception { - - long successes = this.successes.getAndSet(0); //reset counters - long failures = this.failures.getAndSet(0); //reset counters - this.hasFinished = true; - - // refresh notification - Notification notification = em.get(this.notification.getUuid(), Notification.class); - notification.setModified(System.currentTimeMillis()); - - long sent = successes, errors = failures; - //and write them out again, this will produce the most accurate count - Map<String, Long> stats = new HashMap<>(2); - stats.put("sent", sent); - stats.put("errors", errors); - notification.updateStatistics(successes, errors); - - //none of this is known and should you ever do this - if (notification.getExpectedCount() <= (notification.getStatistics().get("sent") + notification.getStatistics().get("errors"))) { - Map<String, Object> properties = new HashMap<>(); - notification.setFinished(notification.getModified()); - properties.put("finished", notification.getModified()); - properties.put("state", notification.getState()); - LOG.info("done sending to devices in {} ms", notification.getFinished() - notification.getStarted()); - notification.addProperties(properties); - } - LOG.info("notification finished batch: {} of {} devices", notification.getUuid(),sent+errors); - em.update(notification); + public void finishedBatch() throws Exception { + synchronized (this) { + long successes = this.successes.getAndSet(0); //reset counters + long failures = this.failures.getAndSet(0); //reset counters + this.hasFinished = true; + + // refresh notification + Notification notification = em.get(this.notification.getUuid(), Notification.class); + notification.setModified(System.currentTimeMillis()); + + //and write them out again, this will produce the most accurate count + Map<String, Long> stats = new HashMap<>(2); + stats.put("sent", successes); + stats.put("errors", failures); + notification.updateStatistics(successes, successes); + + //none of this is known and should you ever do this + if (notification.getExpectedCount() <= (notification.getStatistics().get("sent") + notification.getStatistics().get("errors"))) { + Map<String, Object> properties = new HashMap<>(); + notification.setFinished(notification.getModified()); + properties.put("finished", notification.getModified()); + properties.put("state", notification.getState()); + LOG.info("done sending to devices in {} ms", notification.getFinished() - notification.getStarted()); + notification.addProperties(properties); + } + + LOG.info("notification finished batch: {} of {} devices", notification.getUuid(), successes + failures); + em.update(notification); // Set<Notifier> notifiers = new HashSet<>(proxy.getNotifierMap().values()); // remove dups // proxy.asyncCheckForInactiveDevices(notifiers); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ee998bc/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java ---------------------------------------------------------------------- diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java index 703b6df..faa5091 100644 --- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java +++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java @@ -491,9 +491,10 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT { @Test public void twoDevicesTwoNotifiers() throws Exception { + String notifier2Name = "apNs2"; // create a 2nd notifier // app.clear(); - app.put("name", "apNs2"); + app.put("name", notifier2Name); app.put("provider", PROVIDER); app.put("environment", "development"); InputStream fis = getClass().getClassLoader().getResourceAsStream( @@ -513,10 +514,10 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT { assertEquals(notifier2.getProvider(), PROVIDER); assertEquals(notifier2.getEnvironment(), "development"); - String key = notifier.getName() + NOTIFIER_ID_POSTFIX; - String key2 = notifier2.getName() + NOTIFIER_ID_POSTFIX; + String key = notifierName + NOTIFIER_ID_POSTFIX; + String key2 = notifier2Name + NOTIFIER_ID_POSTFIX; device2.setProperty(key, null); - device2.setProperty(key2, null); + device2.setProperty(key2, PUSH_TOKEN); app.getEm().update(device2); app.getEm().refreshIndex(); @@ -526,8 +527,8 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT { app.clear(); String payload = getPayload(); Map<String, String> payloads = new HashMap<String, String>(1); - payloads.put(notifier.getUuid().toString(), payload); - payloads.put(notifier2.getUuid().toString(), payload); + payloads.put(notifierName, payload); + payloads.put(notifier2Name, payload); app.put("payloads", payloads); app.put("queued", System.currentTimeMillis()); @@ -536,11 +537,8 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT { app.getEm().refreshIndex(); - Notification notification = app.getEm().get(e.getUuid(), - Notification.class); - assertEquals( - notification.getPayloads().get(notifier.getUuid().toString()), - payload); + Notification notification = app.getEm().get(e.getUuid(), Notification.class); + assertEquals(notification.getPayloads().get(notifierName), payload); // perform push // notification = scheduleNotificationAndWait(notification);
