Repository: usergrid Updated Branches: refs/heads/release-2.1.1 6488b05fb -> f272af2f3
Add a separate executor pool for async processing instead of unbounded Schedulers.io() Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f272af2f Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f272af2f Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f272af2f Branch: refs/heads/release-2.1.1 Commit: f272af2f3b41ce6ded649ffcd7410f23e33587fb Parents: 6488b05 Author: Michael Russo <[email protected]> Authored: Wed Apr 20 19:50:41 2016 +0100 Committer: Michael Russo <[email protected]> Committed: Wed Apr 20 19:50:41 2016 +0100 ---------------------------------------------------------------------- .../services/notifications/TaskManager.java | 96 +++++++++----------- .../impl/ApplicationQueueManagerImpl.java | 86 +++++++++++++----- 2 files changed, 105 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/f272af2f/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java index ce2b82c..531ca7c 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java @@ -37,12 +37,10 @@ public class TaskManager { private AtomicLong successes = new AtomicLong(); private AtomicLong failures = new AtomicLong(); private EntityManager em; - private boolean hasFinished; public TaskManager(EntityManager em, Notification notification) { this.em = em; this.notification = notification; - hasFinished = false; } public long getSuccesses(){return successes.get();} @@ -53,77 +51,69 @@ public class TaskManager { completed(notifier,null,deviceUUID,null); } public void completed(Notifier notifier, Receipt receipt, UUID deviceUUID, String newProviderId) throws Exception { - if (logger.isTraceEnabled()) { - logger.trace("REMOVED {}", deviceUUID); - } + + successes.incrementAndGet(); + try { - if (logger.isTraceEnabled()) { - logger.trace("notification {} removing device {} from remaining", notification.getUuid(), deviceUUID); - } EntityRef deviceRef = new SimpleEntityRef(Device.ENTITY_TYPE, deviceUUID); + if (receipt != null) { - if (logger.isTraceEnabled()) { - logger.trace("notification {} sent to device {}. saving receipt.", notification.getUuid(), deviceUUID); - } + receipt.setSent(System.currentTimeMillis()); this.saveReceipt(notification, deviceRef, receipt,false); if (logger.isTraceEnabled()) { - logger.trace("notification {} receipt saved for device {}", notification.getUuid(), deviceUUID); + logger.trace("Notification {} receipt saved for device {}", notification.getUuid(), deviceUUID); } - successes.incrementAndGet(); + } if (newProviderId != null) { if (logger.isTraceEnabled()) { - logger.trace("notification {} replacing device {} notifierId", notification.getUuid(), deviceUUID); + logger.trace("Notification {} replacing notifier id for device {} ", notification.getUuid(), deviceUUID); } replaceProviderId(deviceRef, notifier, newProviderId); } if (logger.isTraceEnabled()) { - logger.trace("notification {} completed device {}", notification.getUuid(), deviceUUID); + logger.trace("Notification {} sending completed for device {}", notification.getUuid(), deviceUUID); } - } finally { - if (logger.isTraceEnabled()) { - logger.trace("COUNT is: {}", successes.get()); - } -// if (hasFinished) { //process has finished but notifications are still coming in -// finishedBatch(); -// -// } + } catch(Exception e) { + + logger.error("Unable to mark notification {} as completed due to: {}", notification.getUuid(), e); + } } public void failed(Notifier notifier, Receipt receipt, UUID deviceUUID, Object code, String message) throws Exception { + failures.incrementAndGet(); + try { if (logger.isDebugEnabled()) { - logger.debug("notification {} for device {} got error {}", notification.getUuid(), deviceUUID, code); + logger.debug("Notification {} for device {} got error {}", notification.getUuid(), deviceUUID, code); } - failures.incrementAndGet(); - if(receipt!=null) { - if ( receipt.getUuid() != null ) { - successes.decrementAndGet(); - } + if(receipt != null) { receipt.setErrorCode( code ); receipt.setErrorMessage( message ); this.saveReceipt( notification, new SimpleEntityRef( Device.ENTITY_TYPE, deviceUUID ), receipt, true ); - if ( logger.isDebugEnabled() ) { - logger.debug( "notification {} receipt saved for device {}", notification.getUuid(), deviceUUID ); - } } - } finally { + completed(notifier, deviceUUID); finishedBatch(); + + } catch (Exception e){ + + logger.error("Unable to finish marking notification {} as failed due to error: ", notification.getUuid(), e); + } } - /* - * called from TaskManager - creates a persistent receipt and updates the - * passed one w/ the UUID + /** + * Called from TaskManager - Creates a persistent receipt + * */ private void saveReceipt(EntityRef notification, EntityRef device, Receipt receipt, boolean hasError) throws Exception { @@ -142,11 +132,16 @@ public class TaskManager { } else { em.addToCollections(entities, Notification.RECEIPTS_COLLECTION, receipt); } + + if ( logger.isDebugEnabled() ) { + logger.debug( "Notification {} receipt saved for device {}", notification.getUuid(), device.getUuid() ); + } + } } - protected void replaceProviderId(EntityRef device, Notifier notifier, + private void replaceProviderId(EntityRef device, Notifier notifier, String newProviderId) throws Exception { Object value = em.getProperty(device, notifier.getName() + ApplicationQueueManager.NOTIFIER_ID_POSTFIX); @@ -161,33 +156,24 @@ public class TaskManager { } } - public void finishedBatch() throws Exception { - finishedBatch(true); - } - public void finishedBatch(boolean refreshNotification) throws Exception { - - long successes = this.successes.get(); //reset counters - long failures = this.failures.get(); //reset counters + public void finishedBatch() throws Exception { - for (int i = 0; i < successes; i++) { - this.successes.decrementAndGet(); - } - for (int i = 0; i < failures; i++) { - this.failures.decrementAndGet(); - } + long successes = this.successes.get(); + long failures = this.failures.get(); - this.hasFinished = true; + // reset the counters + this.successes.set(0); + this.failures.set(0); - // force refresh notification by fetching it - if (refreshNotification) { - notification = em.get(this.notification.getUuid(), Notification.class); - } + // get the latest notification info + notification = em.get(this.notification.getUuid(), Notification.class); notification.updateStatistics(successes, failures); notification.setModified(System.currentTimeMillis()); notification.setFinished(notification.getModified()); em.update(notification); + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/f272af2f/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java index 2f39ae4..1bb92b7 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java @@ -19,6 +19,7 @@ package org.apache.usergrid.services.notifications.impl; import com.codahale.metrics.Meter; import org.apache.usergrid.batch.JobExecution; import org.apache.usergrid.persistence.*; +import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.entities.*; import org.apache.usergrid.persistence.Query; @@ -52,11 +53,19 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { private final Meter queueMeter; private final Meter sendMeter; + private final static String PUSH_PROCESSING_MAXTHREADS_PROP = "usergrid.push.async.processing.threads"; + private final static String PUSH_PROCESSING_QUEUESIZE_PROP = "usergrid.push.async.processing.queue.size"; private final static String PUSH_PROCESSING_CONCURRENCY_PROP = "usergrid.push.async.processing.concurrency"; HashMap<Object, ProviderAdapter> notifierHashMap; // only retrieve notifiers once + + private final ExecutorService asyncExecutor; + + + + public ApplicationQueueManagerImpl( JobScheduler jobScheduler, EntityManager entityManager, QueueManager queueManager, MetricsFactory metricsFactory, Properties properties) { @@ -65,8 +74,31 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { this.jobScheduler = jobScheduler; this.metricsFactory = metricsFactory; this.queueName = getQueueNames(properties); - queueMeter = metricsFactory.getMeter(ApplicationQueueManagerImpl.class, "notification.queue"); - sendMeter = metricsFactory.getMeter(NotificationsService.class, "queue.send"); + this.queueMeter = metricsFactory.getMeter(ApplicationQueueManagerImpl.class, "notification.queue"); + this.sendMeter = metricsFactory.getMeter(NotificationsService.class, "queue.send"); + + int maxAsyncThreads; + int workerQueueSize; + + try { + + maxAsyncThreads = Integer.valueOf(System.getProperty(PUSH_PROCESSING_MAXTHREADS_PROP, "200")); + workerQueueSize = Integer.valueOf(System.getProperty(PUSH_PROCESSING_QUEUESIZE_PROP, "2000")); + + } catch (Exception e){ + + // if junk is passed into the property, just default the values + maxAsyncThreads = 200; + workerQueueSize = 2000; + + } + + + // create our own executor which has a bounded queue w/ caller runs policy for rejected tasks + this.asyncExecutor = TaskExecutorFactory + .createTaskExecutor( "push-device-io", maxAsyncThreads, workerQueueSize, + TaskExecutorFactory.RejectionAction.CALLERRUNS ); + } @@ -296,7 +328,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { } - }).subscribeOn(Schedulers.io()); + }).subscribeOn(Schedulers.from(asyncExecutor)); }, Integer.valueOf(System.getProperty(PUSH_PROCESSING_CONCURRENCY_PROP, "50"))) .doOnError(throwable -> { @@ -327,7 +359,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { }); - processMessagesObservable.subscribeOn(Schedulers.io()).subscribe(); // fire the queuing into the background + processMessagesObservable.subscribeOn(Schedulers.from(asyncExecutor)).subscribe(); // fire the queuing into the background } @@ -348,7 +380,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { // if no devices, go ahead and mark the batch finished if (deviceCount.get() <= 0 ) { TaskManager taskManager = new TaskManager(em, notification); - taskManager.finishedBatch(true); + taskManager.finishedBatch(); } @@ -540,32 +572,43 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { /** - * Validates that a notifier and adapter exists to send notifications to; - * {"winphone":"mymessage","apple":"mymessage"} - * TODO: document this method better + * Validates that a notifier and adapter exists to send notifications to. For the example payload + * + * { "payloads" : {"winphone":"mymessage","apple":"mymessage"} } + * + * Notifiers with name "winphone" and "apple" must exist. */ - private Map<String, Object> translatePayloads(Map<String, Object> payloads, Map<Object, ProviderAdapter> - notifierMap) throws Exception { - Map<String, Object> translatedPayloads = new HashMap<String, Object>(payloads.size()); + private Map<String, Object> translatePayloads(Map<String, Object> payloads, + Map<Object, ProviderAdapter> notifierMap) throws Exception { + + final Map<String, Object> translatedPayloads = new HashMap<String, Object>(payloads.size()); + for (Map.Entry<String, Object> entry : payloads.entrySet()) { + String payloadKey = entry.getKey().toLowerCase(); Object payloadValue = entry.getValue(); + //look for adapter from payload map ProviderAdapter providerAdapter = notifierMap.get(payloadKey); if (providerAdapter != null) { + //translate payload to usable information Object translatedPayload = payloadValue != null ? providerAdapter.translatePayload(payloadValue) : null; if (translatedPayload != null) { translatedPayloads.put(payloadKey, translatedPayload); } + } } return translatedPayloads; } public static String getQueueNames(Properties properties) { - String name = properties.getProperty(ApplicationQueueManagerImpl.DEFAULT_QUEUE_PROPERTY, ApplicationQueueManagerImpl.DEFAULT_QUEUE_NAME); + + String name = properties.getProperty(ApplicationQueueManagerImpl.DEFAULT_QUEUE_PROPERTY, + ApplicationQueueManagerImpl.DEFAULT_QUEUE_NAME); return name; + } private static final class IteratorObservable<T> implements rx.Observable.OnSubscribe<T> { @@ -585,15 +628,15 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { try { while (!subscriber.isUnsubscribed() && input.hasNext()) { + //send our input to the next - //logger.debug("calling next on iterator: {}", input.getClass().getSimpleName()); subscriber.onNext((T) input.next()); + } //tell the subscriber we don't have any more data - //logger.debug("finished iterator: {}", input.getClass().getSimpleName()); - subscriber.onCompleted(); + } catch (Throwable t) { logger.error("failed on subscriber", t); subscriber.onError(t); @@ -617,10 +660,9 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { } } } catch (Exception e) { - logger.error("checkForInactiveDevices", e); // not - // essential so - // don't fail, - // but log + // not essential so don't fail, but log + logger.error("checkForInactiveDevices", e); + } } } @@ -630,14 +672,14 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { if (notification.getCanceled() == Boolean.TRUE) { if (logger.isDebugEnabled()) { - logger.debug("notification {} canceled. not sending.", + logger.debug("Notification {} canceled. Not sending.", notification.getUuid()); } return false; } if (notification.isExpired()) { if (logger.isDebugEnabled()) { - logger.debug("notification {} expired. not sending.", + logger.debug("Notification {} expired. Not sending.", notification.getUuid()); } return false; @@ -654,7 +696,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { } return value != null ? value.toString() : null; } catch (Exception e) { - logger.error("Error getting provider ID, proceeding with rest of batch", e); + logger.error("Error getting notifier for device {}, proceeding with rest of batch", device, e); return null; } }
