Update notification processing to allow more parallel work.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/8cf78252 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/8cf78252 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/8cf78252 Branch: refs/heads/master Commit: 8cf782527f705109fdbedd8d8767e6074e42796a Parents: 32ab5da Author: Michael Russo <[email protected]> Authored: Sat Apr 16 14:40:20 2016 +0100 Committer: Michael Russo <[email protected]> Committed: Sat Apr 16 14:40:20 2016 +0100 ---------------------------------------------------------------------- .../corepersistence/CpEntityManager.java | 47 ++++-- .../IllegalArgumentExceptionMapper.java | 4 +- .../notifications/NotificationsService.java | 27 ++-- .../impl/ApplicationQueueManagerImpl.java | 145 ++++++++++--------- 4 files changed, 128 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/8cf78252/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index b2330f3..750cf7b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -34,7 +34,9 @@ import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.usergrid.persistence.collection.EntitySet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; @@ -2511,23 +2513,42 @@ public class CpEntityManager implements EntityManager { @Override public Results getEntities( List<UUID> ids, String type ) { - ArrayList<Entity> entities = new ArrayList<Entity>(); - for ( UUID uuid : ids ) { - EntityRef ref = new SimpleEntityRef( type, uuid ); - Entity entity = null; - try { - entity = get( ref ); - } - catch ( Exception ex ) { - logger.warn( "Entity {}/{} not found", uuid, type ); - } - if ( entity != null ) { - entities.add( entity ); - } + List<Id> entityIds = new ArrayList<>(); + + for( UUID uuid : ids){ + + entityIds.add(new SimpleId( uuid, type )); + } + // leverage ecm.load so it's a batch fetch of all entities from Cassandra + EntitySet entitySet = ecm.load( entityIds ).toBlocking().last(); + + List<Entity> entities = entitySet.getEntities().stream().map( mvccEntity -> { + + if( mvccEntity.getEntity().isPresent() ){ + + org.apache.usergrid.persistence.model.entity.Entity cpEntity = mvccEntity.getEntity().get(); + + Class clazz = Schema.getDefaultSchema().getEntityClass( mvccEntity.getId().getType() ); + + Entity entity = EntityFactory.newEntity( mvccEntity.getId().getUuid(), mvccEntity.getId().getType(), clazz ); + entity.setProperties( cpEntity ); + + return entity; + + }else{ + + logger.warn("Tried fetching entity with id: {} and type: but was not found", + mvccEntity.getId().getUuid(), mvccEntity.getId().getType() ); + + return null; + } + }).collect(Collectors.toList()); + + return Results.fromEntities( entities ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/8cf78252/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/IllegalArgumentExceptionMapper.java ---------------------------------------------------------------------- diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/IllegalArgumentExceptionMapper.java b/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/IllegalArgumentExceptionMapper.java index ff7b656..e6243e9 100644 --- a/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/IllegalArgumentExceptionMapper.java +++ b/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/IllegalArgumentExceptionMapper.java @@ -34,7 +34,9 @@ public class IllegalArgumentExceptionMapper extends AbstractExceptionMapper<Ille @Override public Response toResponse( IllegalArgumentException e ) { - logger.error( "Illegal argument was passed, returning bad request to user", e ); + if(logger.isTraceEnabled()) { + logger.trace("Illegal argument was passed, returning bad request to user", e.getMessage()); + } return toResponse( BAD_REQUEST, e ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/8cf78252/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java index 50eb883..f4fdb65 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java @@ -20,6 +20,7 @@ package org.apache.usergrid.services.notifications; import java.util.*; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.services.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,15 +40,6 @@ import org.apache.usergrid.persistence.queue.QueueManager; import org.apache.usergrid.persistence.queue.QueueManagerFactory; import org.apache.usergrid.persistence.queue.QueueScope; import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl; -import org.apache.usergrid.services.AbstractCollectionService; -import org.apache.usergrid.services.ServiceAction; -import org.apache.usergrid.services.ServiceContext; -import org.apache.usergrid.services.ServiceInfo; -import org.apache.usergrid.services.ServiceManagerFactory; -import org.apache.usergrid.services.ServiceParameter; -import org.apache.usergrid.services.ServicePayload; -import org.apache.usergrid.services.ServiceRequest; -import org.apache.usergrid.services.ServiceResults; import org.apache.usergrid.services.exceptions.ForbiddenServiceOperationException; import org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl; @@ -85,7 +77,6 @@ public class NotificationsService extends AbstractCollectionService { private ServiceManagerFactory smf; private EntityManagerFactory emf; private QueueManagerFactory queueManagerFactory; - private EntityCollectionManagerFactory ecmf; public NotificationsService() { if (logger.isTraceEnabled()) { @@ -139,10 +130,20 @@ public class NotificationsService extends AbstractCollectionService { Timer.Context timer = postTimer.time(); postMeter.mark(); try { + validate(null, context.getPayload()); - Notification.PathTokens pathTokens = getPathTokens(context.getRequest().getOriginalParameters()); - // default saving of receipts + // perform some input validates on useGraph payload property vs. ql= path query + final List<ServiceParameter> parameters = context.getRequest().getOriginalParameters(); + for (ServiceParameter parameter : parameters){ + if( parameter instanceof ServiceParameter.QueryParameter && context.getProperties().get("useGraph").equals(true)){ + throw new IllegalArgumentException("Query ql parameter cannot be used with useGraph:true property value"); + } + } + + Notification.PathTokens pathTokens = getPathTokens(parameters); + + // set defaults context.getProperties().put("filters", context.getProperties().getOrDefault("filters", new HashMap<>())); context.getProperties().put("useGraph", context.getProperties().getOrDefault("useGraph", false)); context.getProperties().put("saveReceipts", context.getProperties().getOrDefault("saveReceipts", true)); @@ -175,7 +176,7 @@ public class NotificationsService extends AbstractCollectionService { // future: somehow return 202? return results; }catch (Exception e){ - logger.error("serialization failed",e); + logger.error(e.getMessage()); throw e; }finally { timer.stop(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/8cf78252/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java index 487ea1f..5ce1b93 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java @@ -58,7 +58,9 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { HashMap<Object, ProviderAdapter> notifierHashMap; // only retrieve notifiers once - public ApplicationQueueManagerImpl(JobScheduler jobScheduler, EntityManager entityManager, QueueManager queueManager, MetricsFactory metricsFactory, Properties properties) { + public ApplicationQueueManagerImpl( JobScheduler jobScheduler, EntityManager entityManager, + QueueManager queueManager, MetricsFactory metricsFactory, + Properties properties) { this.em = entityManager; this.qm = queueManager; this.jobScheduler = jobScheduler; @@ -116,21 +118,24 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { iterator = pathQuery.iterator(em); } - //if there are more pages (defined by PAGE_SIZE) you probably want this to be async, also if this is already a job then don't reschedule - if (iterator instanceof ResultsIterator && ((ResultsIterator) iterator).hasPages() && jobExecution == null) { - if(logger.isTraceEnabled()){ - logger.trace("Scheduling notification job as it has multiple pages of devices."); - } - jobScheduler.scheduleQueueJob(notification, true); - em.update(notification); - return; - } +// //if there are more pages (defined by PAGE_SIZE) you probably want this to be async, also if this is already a job then don't reschedule +// if (iterator instanceof ResultsIterator && ((ResultsIterator) iterator).hasPages() && jobExecution == null) { +// if(logger.isTraceEnabled()){ +// logger.trace("Scheduling notification job as it has multiple pages of devices."); +// } +// jobScheduler.scheduleQueueJob(notification, true); +// em.update(notification); +// return; +// } final UUID appId = em.getApplication().getUuid(); final Map<String, Object> payloads = notification.getPayloads(); final Func1<EntityRef, Optional<ApplicationQueueMessage>> sendMessageFunction = deviceRef -> { + try { + //logger.info("Preparing notification queue message for device: {}", deviceRef.getUuid()); + long now = System.currentTimeMillis(); String notifierId = null; @@ -163,6 +168,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { notification.setQueued(System.currentTimeMillis()); } + deviceCount.incrementAndGet(); return Optional.of(message); @@ -190,90 +196,95 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { return Observable.from(getDevices(entity)); }) - .distinct(ref -> ref.getUuid()) - .flatMap( entityRef -> { + .distinct(ref -> ref.getUuid() ) + .map( entityRef -> entityRef.getUuid() ) + .buffer(10) + .flatMap( uuids -> { - return Observable.just(entityRef).flatMap( ref -> { + if(logger.isTraceEnabled()) { + logger.trace("Processing batch of {} device(s)", uuids.size()); + } - if(logger.isTraceEnabled()){ - logger.trace("Loading device: {}", ref.getUuid()); - } - try { - return Observable.just(em.get(ref, Device.class)); - } - catch (Exception e){ - - return Observable.empty(); + return Observable.from(em.getEntities(uuids, "device")) + .filter( device -> { + if(logger.isTraceEnabled()) { + logger.trace("Filtering device: {}", device.getUuid()); } - }).subscribeOn(Schedulers.io()); + if(notification.getUseGraph() && filters.size() > 0 ) { - }, 50) - .filter( device -> { + for (Map.Entry<String, Object> entry : filters.entrySet()) { - if(logger.isTraceEnabled()) { - logger.trace("Filtering device: {}", device.getUuid()); - } + if ((device.getDynamicProperties().get(entry.getKey()) != null && + device.getDynamicProperties().get(entry.getKey()).equals(entry.getValue())) || + (device.getProperties().get(entry.getKey()) != null && + device.getProperties().get(entry.getKey()).equals(entry.getValue())) - if(notification.getUseGraph() && filters.size() > 0 ) { + ) { - for (Map.Entry<String, Object> entry : filters.entrySet()) { - if ((device.getDynamicProperties().get(entry.getKey()) != null && - device.getDynamicProperties().get(entry.getKey()).equals(entry.getValue())) || + return true; + } - (device.getProperties().get(entry.getKey()) != null && - device.getProperties().get(entry.getKey()).equals(entry.getValue())) + } - ) { + if(logger.isTraceEnabled()) { + logger.trace("Push notification filter matched for notification {}, so removing from notification", + device.getUuid(), notification.getUuid()); + } + return false; - return true; } - } + return true; - if(logger.isTraceEnabled()) { - logger.trace("Push notification filter matched for notification {}, so removing from notification", - device.getUuid(), notification.getUuid()); - } - return false; - - - } + }) + .map(sendMessageFunction) + .doOnNext( message -> { + try { - return true; + if(message.isPresent()){ - }) - .map(sendMessageFunction) - .doOnNext( message -> { - try { + if(logger.isTraceEnabled()) { + logger.trace("Queueing notification message for device: {}", message.get().getDeviceId()); + } + qm.sendMessage( message.get() ); + queueMeter.mark(); + } - if(message.isPresent()){ + } catch (IOException e) { - if(logger.isTraceEnabled()) { - logger.trace("Queueing notification message for device: {}", message.get().getDeviceId()); + if(message.isPresent()){ + logger.error("Unable to queue notification for notification UUID {} and device UUID {} ", + message.get().getNotificationId(), message.get().getDeviceId()); + } + else{ + logger.error("Unable to queue notification as it's not present when trying to send to queue"); } - qm.sendMessage( message.get() ); - queueMeter.mark(); + } - } catch (IOException e) { - if(message.isPresent()){ - logger.error("Unable to queue notification for notification UUID {} and device UUID {} ", - message.get().getNotificationId(), message.get().getDeviceId()); - } - else{ - logger.error("Unable to queue notification as it's not present when trying to send to queue"); - } + }).subscribeOn(Schedulers.io()); + }, 10) - } + .doOnError(throwable -> { + logger.error("Error while processing devices for notification : {}", notification.getUuid()); + notification.setProcessingFinished(-1L); + notification.setDeviceProcessedCount(deviceCount.get()); + logger.warn("Partial notification. Only {} devices processed for notification {}", + deviceCount.get(), notification.getUuid()); + try { + em.update(notification); + }catch (Exception e){ + logger.error("Error updating negative processing status when processing failed."); + } }) .doOnCompleted( () -> { @@ -282,16 +293,14 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { notification.setProcessingFinished(System.currentTimeMillis()); notification.setDeviceProcessedCount(deviceCount.get()); em.update(notification); - logger.info("{} devices processed for notification {}", deviceCount.get(), notification.getUuid()); + logger.info("{} device(s) processed for notification {}", deviceCount.get(), notification.getUuid()); } catch (Exception e) { logger.error("Unable to set processing finished timestamp for notification"); } - }) - .doOnError(throwable -> logger.error("Failed while trying to send notification", throwable)); + }); - //TODO verify error handling here processMessagesObservable.subscribeOn(Schedulers.io()).subscribe(); // fire the queuing into the background }
