Repository: usergrid Updated Branches: refs/heads/release-2.1.1 3df07791c -> 7af4f8454
Update event handling to better handle case when no index requests are returned from event processing. Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/7af4f845 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/7af4f845 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/7af4f845 Branch: refs/heads/release-2.1.1 Commit: 7af4f8454ff9f2270835a2983fa41c390d4602f5 Parents: 3df0779 Author: Michael Russo <[email protected]> Authored: Tue Jun 14 13:57:14 2016 -0700 Committer: Michael Russo <[email protected]> Committed: Tue Jun 14 13:57:14 2016 -0700 ---------------------------------------------------------------------- .../asyncevents/AsyncEventServiceImpl.java | 31 +++++++++++++++----- 1 file changed, 24 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/7af4f845/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java index fa175ab..8d050fe 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java @@ -355,6 +355,11 @@ public class AsyncEventServiceImpl implements AsyncEventService { throw new Exception("Unknown EventType for message: "+ message.getStringBody().trim()); } + if( single.isEmpty() ){ + logger.warn("No index operation messages came back from event processing for msg {} ", + message.getStringBody().trim()); + } + // if no exception happens and the QueueMessage is returned in these results, it will get ack'd return new IndexEventResult(Optional.of(single), Optional.of(message), thisEvent.getCreationTime()); @@ -370,8 +375,16 @@ public class AsyncEventServiceImpl implements AsyncEventService { } catch (Exception e) { + // NPEs don't have a detail message, so add something for our log statement to identify better + final String errorMessage; + if( e instanceof NullPointerException ) { + errorMessage = "NullPointerException"; + }else{ + errorMessage = e.getMessage(); + } + // if the event fails to process, log and return empty message result so it doesn't get ack'd - logger.error("{}. Failed to process message: {}", e.getMessage(), message.getStringBody().trim() ); + logger.error("{}. Failed to process message: {}", errorMessage, message.getStringBody().trim() ); return new IndexEventResult(Optional.absent(), Optional.absent(), thisEvent.getCreationTime()); } }); @@ -427,7 +440,8 @@ public class AsyncEventServiceImpl implements AsyncEventService { final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entityId, updatedAfter); - return eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null); + // default this observable's return to empty index operation message if nothing is emitted + return eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(new IndexOperationMessage()); } @@ -453,9 +467,10 @@ public class AsyncEventServiceImpl implements AsyncEventService { final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( edgeIndexEvent.getApplicationScope() ); + // default this observable's return to empty index operation message if nothing is emitted return ecm.load( edgeIndexEvent.getEntityId() ) .flatMap( loadedEntity -> eventBuilder.buildNewEdge(edgeIndexEvent.getApplicationScope(), loadedEntity, edgeIndexEvent.getEdge()) ) - .toBlocking().lastOrDefault(null); + .toBlocking().lastOrDefault(new IndexOperationMessage()); } @@ -487,7 +502,8 @@ public class AsyncEventServiceImpl implements AsyncEventService { logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge); } - return eventBuilder.buildDeleteEdge(applicationScope, edge).toBlocking().lastOrDefault(null); + // default this observable's return to empty index operation message if nothing is emitted + return eventBuilder.buildDeleteEdge(applicationScope, edge).toBlocking().lastOrDefault(new IndexOperationMessage()); } @@ -600,9 +616,9 @@ public class AsyncEventServiceImpl implements AsyncEventService { final Id entityId = deIndexOldVersionsEvent.getEntityIdScope().getId(); final UUID markedVersion = deIndexOldVersionsEvent.getMarkedVersion(); + // default this observable's return to empty index operation message if nothing is emitted return eventBuilder.deIndexOldVersions( applicationScope, entityId, markedVersion ) - .toBlocking().lastOrDefault(null); - + .toBlocking().lastOrDefault(new IndexOperationMessage()); } @@ -675,7 +691,8 @@ public class AsyncEventServiceImpl implements AsyncEventService { entityDeleteResults.getCompactedNode().toBlocking().lastOrDefault(null); - return entityDeleteResults.getIndexObservable().toBlocking().lastOrDefault(null); + // default this observable's return to empty index operation message if nothing is emitted + return entityDeleteResults.getIndexObservable().toBlocking().lastOrDefault(new IndexOperationMessage()); }
