This is an automated email from the ASF dual-hosted git repository. timothyjward pushed a commit to branch fix/logging in repository https://gitbox.apache.org/repos/asf/aries-typedevent.git
commit 01a9dc85efdf0010615bc716a00f0911d0ee83c7 Author: Tim Ward <[email protected]> AuthorDate: Mon Dec 16 12:15:56 2024 +0000 Logging and debuggability improvements This commit adds proper logging into the Typed Events implementation making it easier to understand what's going on Signed-off-by: Tim Ward <[email protected]> --- .../aries/typedevent/bus/impl/EventConverter.java | 91 +++++++++++++++++++--- .../aries/typedevent/bus/impl/EventSelector.java | 39 ++++++++-- .../aries/typedevent/bus/impl/EventTask.java | 18 ++++- .../typedevent/bus/impl/MonitorEventTask.java | 17 ++-- .../apache/aries/typedevent/bus/impl/TypeData.java | 5 ++ .../bus/impl/TypedEventBusActivator.java | 16 ++-- .../typedevent/bus/impl/TypedEventBusImpl.java | 37 ++++++--- .../typedevent/bus/impl/TypedEventMonitorImpl.java | 13 ++++ .../aries/typedevent/bus/impl/TypedEventTask.java | 16 ++-- .../typedevent/bus/impl/UnhandledEventTask.java | 19 ++--- .../typedevent/bus/impl/UntypedEventTask.java | 19 ++--- 11 files changed, 222 insertions(+), 68 deletions(-) diff --git a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventConverter.java b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventConverter.java index 6df162b..677dcd1 100644 --- a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventConverter.java +++ b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventConverter.java @@ -49,6 +49,8 @@ import org.osgi.util.converter.Converter; import org.osgi.util.converter.ConverterFunction; import org.osgi.util.converter.Converters; import org.osgi.util.converter.TypeReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class is responsible for converting events to and from their "flattened" @@ -58,6 +60,8 @@ import org.osgi.util.converter.TypeReference; */ public class EventConverter { + private static final Logger _log = LoggerFactory.getLogger(EventConverter.class); + private static final TypeReference<List<Object>> LIST_OF_OBJECTS = new TypeReference<List<Object>>() { }; private static final TypeReference<Set<Object>> SET_OF_OBJECTS = new TypeReference<Set<Object>>() { @@ -105,6 +109,12 @@ public class EventConverter { .errorHandler(EventConverter::attemptRecovery).build(); } + /** + * Conversion for nested Map values + * @param o - the value to convert + * @param target - the target - should be Object.class + * @return + */ static Object convert(Object o, Type target) { if (target != Object.class || o == null) { @@ -115,11 +125,17 @@ public class EventConverter { // "Safe" classes use an identity transform if (safeClasses.contains(sourceClass)) { + if(_log.isDebugEnabled()) { + _log.debug("The object {} does not require conversion.", o); + } return o; } // "Special" types and Enums map to strings if (specialClasses.contains(sourceClass) || sourceClass.isEnum()) { + if(_log.isDebugEnabled()) { + _log.debug("The object {} will be mapped as a String.", o); + } return eventConverter.convert(o).sourceAs(Object.class).to(String.class); } @@ -127,6 +143,9 @@ public class EventConverter { // the relevant collection type containing objects, this // ensures we pick up any embedded lists of DTOs or enums if (o instanceof Collection) { + if(_log.isDebugEnabled()) { + _log.debug("The collection {} will be processed to map the contained objects.", o); + } if (o instanceof Set) { return eventConverter.convert(o).to(SET_OF_OBJECTS); } else { @@ -137,10 +156,16 @@ public class EventConverter { // As with collections we remap nested maps to clean up any // undesirable types in the keys or values if (o instanceof Map) { + if(_log.isDebugEnabled()) { + _log.debug("The map {} will be processed to remap the contained objects.", o); + } return eventConverter.convert(o).to(MAP_OF_OBJECT_TO_OBJECT); } if (sourceClass.isArray()) { + if(_log.isDebugEnabled()) { + _log.debug("The array {} will be processed to remap the contained objects.", o); + } int depth = 1; Class<?> arrayComponentType = sourceClass.getComponentType(); Class<?> actualComponentType = sourceClass.getComponentType(); @@ -149,12 +174,21 @@ public class EventConverter { actualComponentType = actualComponentType.getComponentType(); } if (safeClasses.contains(actualComponentType) || actualComponentType.isPrimitive()) { + if(_log.isDebugEnabled()) { + _log.debug("The array {} does not need conversion.", o); + } return o; } else if (actualComponentType.isEnum()) { + if(_log.isDebugEnabled()) { + _log.debug("The array {} will use String values.", o); + } // This becomes an n dimensional String array Class<?> stringArrayType = Array.newInstance(String.class, new int[depth]).getClass(); return eventConverter.convert(o).to(stringArrayType); } else { + if(_log.isDebugEnabled()) { + _log.debug("The array {} is complex and will be treated as a list.", o); + } // This is an array of something complicated, recursively turn it into a // list of something, then make it into an array of the right type List<Object> oList = eventConverter.convert(o).to(LIST_OF_OBJECTS); @@ -162,6 +196,9 @@ public class EventConverter { } } + if(_log.isDebugEnabled()) { + _log.debug("The object {} will be treated as a DTO.", o); + } // If we get here then treat the type as a DTO return eventConverter.convert(o).sourceAsDTO().to(MAP_WITH_STRING_KEYS); } @@ -171,15 +208,17 @@ public class EventConverter { Set<Object> errors = errorsBeingHandled.get(); if (errors.contains(o)) { - // TODO log the warning in a big way + if(_log.isWarnEnabled()) { + _log.warn("The map {} repeatedly failed conversion to type {}. This event cannot be converted", o, target); + } return ConverterFunction.CANNOT_HANDLE; } try { + if(_log.isWarnEnabled()) { + _log.warn("The map {} failed conversion to type {}. Convertsion will be reattempted treating the target as a DTO", o, target); + } errors.add(o); - - // TODO log the warning in a big way - return eventConverter.convert(o).targetAsDTO().to(target); } finally { errors.remove(o); @@ -191,14 +230,14 @@ public class EventConverter { private final Object originalEvent; private final CustomEventConverter custom; - private Map<String, ?> untypedEventDataForFiltering; + private Map<String, Object> untypedEventDataForFiltering; private EventConverter(Object event, CustomEventConverter custom) { this.originalEvent = event; this.custom = custom; } - private EventConverter(Map<String, ?> untypedEvent, CustomEventConverter custom) { + private EventConverter(Map<String, Object> untypedEvent, CustomEventConverter custom) { this.originalEvent = untypedEvent; this.custom = custom; this.untypedEventDataForFiltering = untypedEvent; @@ -223,12 +262,29 @@ public class EventConverter { return f.matches(toTest); } - public Map<String, ?> toUntypedEvent() { + public Map<String, Object> toUntypedEvent() { if (untypedEventDataForFiltering == null) { - if(custom == null || - (untypedEventDataForFiltering = custom.toUntypedEvent(originalEvent)) == null ) { + if(custom != null) { + if(_log.isDebugEnabled()) { + _log.debug("Using custom converter to convert {} to untyped data", originalEvent); + } + untypedEventDataForFiltering = custom.toUntypedEvent(originalEvent); + if(untypedEventDataForFiltering == null) { + if(_log.isDebugEnabled()) { + _log.debug("Custom event converter could not convert event {}. Falling back to built-in conversion", + originalEvent); + } + untypedEventDataForFiltering = eventConverter.convert(originalEvent).sourceAsDTO().to(MAP_WITH_STRING_KEYS); + } else { + return untypedEventDataForFiltering; + } + } else { + if(_log.isDebugEnabled()) { + _log.debug("Converting {} to untyped data", originalEvent); + } untypedEventDataForFiltering = eventConverter.convert(originalEvent).sourceAsDTO().to(MAP_WITH_STRING_KEYS); } + } return untypedEventDataForFiltering; } @@ -238,15 +294,30 @@ public class EventConverter { Class<?> rawType = targetEventClass.getRawType(); Type genericTarget = targetEventClass.getType(); if (rawType.isInstance(originalEvent) && rawType == genericTarget) { + if(_log.isDebugEnabled()) { + _log.debug("No need to convert {} to {} as it is already an instance", originalEvent, rawType); + } return (T) originalEvent; } else { if(custom != null) { + if(_log.isDebugEnabled()) { + _log.debug("Using custom converter to convert {} to {}", originalEvent, genericTarget); + } Object result = custom.toTypedEvent(originalEvent, rawType, genericTarget); if(result != null) { return (T) result; } + if(_log.isDebugEnabled()) { + _log.debug("Custom event converter could not convert event {} to {}. Falling back to built-in conversion", + originalEvent, genericTarget); + } + return eventConverter.convert(originalEvent).targetAsDTO().to(genericTarget); + } else { + if(_log.isDebugEnabled()) { + _log.debug("Converting {} to {}", originalEvent, genericTarget); + } + return eventConverter.convert(originalEvent).targetAsDTO().to(genericTarget); } - return eventConverter.convert(originalEvent).targetAsDTO().to(genericTarget); } } diff --git a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventSelector.java b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventSelector.java index 08699f3..75f5408 100644 --- a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventSelector.java +++ b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventSelector.java @@ -22,9 +22,13 @@ import java.util.List; import java.util.function.Predicate; import org.osgi.framework.Filter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class EventSelector { + private static final Logger _log = LoggerFactory.getLogger(EventSelector.class); + /** The event filter **/ private final Filter filter; @@ -62,6 +66,10 @@ public class EventSelector { * @param filter */ public EventSelector(String topic, Filter filter) { + if(_log.isDebugEnabled()) { + _log.debug("Generating selector for topic {} with filter {}", topic, filter); + } + this.filter = filter; if(topic == null) { @@ -101,11 +109,20 @@ public class EventSelector { if(additionalSegments.isEmpty()) { if(isMultiLevelWildcard) { + if(_log.isDebugEnabled()) { + _log.debug("No single level wildcards for topic {}. Prefix matching \"{}\" will be used", topic, initial); + } topicMatcher = s -> s.startsWith(initial); } else { + if(_log.isDebugEnabled()) { + _log.debug("No single level wildcards for topic {}. Exact matching will be used", topic); + } topicMatcher = initial::equals; } } else { + if(_log.isDebugEnabled()) { + _log.debug("Single level wildcards detected for topic {}. Prefix matching \"{}\" will be used", topic, initial); + } topicMatcher = this::topicMatch; } } @@ -130,20 +147,30 @@ public class EventSelector { // Check the next segment startIdx += segment.length(); } else { + if(_log.isDebugEnabled()) { + _log.debug("Topic {} does not match selector with initial {} and addtionals {}", + topic, initial, additionalSegments); + } // Doesn't match the segment return false; } } - if(startIdx == topic.length()) { - // We consumed the whole topic so this is a match - return true; - } else if(isMultiLevelWildcard && topic.charAt(startIdx - 1) == '/') { - // We consumed a whole number of tokens and are multi-level + if(startIdx == topic.length() || + (isMultiLevelWildcard && topic.charAt(startIdx - 1) == '/')) { + if(_log.isDebugEnabled()) { + _log.debug("Topic {} matches selector with initial {} and addtionals {}", + topic, initial, additionalSegments); + } + // We consumed the whole topic, or the remaining tokens were + // accepted by a multi-level wildcard, so this is a match. return true; } } - + if(_log.isDebugEnabled()) { + _log.debug("Topic {} does not match selector with initial {} and addtionals {}", + topic, initial, additionalSegments); + } return false; } diff --git a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventTask.java b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventTask.java index 67b77fb..2681a10 100644 --- a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventTask.java +++ b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventTask.java @@ -17,6 +17,22 @@ package org.apache.aries.typedevent.bus.impl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + abstract class EventTask { - public abstract void notifyListener(); + + private static final Logger _log = LoggerFactory.getLogger(EventTask.class); + + public void notifyListener() { + try { + unsafeNotify(); + } catch (Exception e) { + // TODO blacklisting? + _log.error("The event delivery failed for task type.", getClass(), e); + } + } + + protected abstract void unsafeNotify() throws Exception; + } \ No newline at end of file diff --git a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/MonitorEventTask.java b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/MonitorEventTask.java index 1336cb5..93e0dc2 100644 --- a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/MonitorEventTask.java +++ b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/MonitorEventTask.java @@ -17,10 +17,13 @@ package org.apache.aries.typedevent.bus.impl; -import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class MonitorEventTask extends EventTask { + private static final Logger _log = LoggerFactory.getLogger(MonitorEventTask.class); + private final String eventType; private final EventConverter eventData; private final TypedEventMonitorImpl monitorImpl; @@ -32,13 +35,11 @@ class MonitorEventTask extends EventTask { this.monitorImpl = monitorImpl; } - @SuppressWarnings("unchecked") @Override - public void notifyListener() { - try { - monitorImpl.event(eventType, (Map<String, Object>) eventData.toUntypedEvent()); - } catch (Exception e) { - // TODO log this, also blacklist? - } + protected void unsafeNotify() { + if(_log.isDebugEnabled()) { + _log.debug("Distributing event to the event monitor"); + } + monitorImpl.event(eventType, eventData.toUntypedEvent()); } } \ No newline at end of file diff --git a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypeData.java b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypeData.java index af58df0..4eadde7 100644 --- a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypeData.java +++ b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypeData.java @@ -49,4 +49,9 @@ public final class TypeData { public Type getType() { return type; } + + @Override + public String toString() { + return "TypeData [type=" + type + "]"; + } } diff --git a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusActivator.java b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusActivator.java index 28468d0..428aadb 100644 --- a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusActivator.java +++ b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusActivator.java @@ -61,8 +61,8 @@ public class TypedEventBusActivator implements BundleActivator { @Override public void start(BundleContext bundleContext) throws Exception { - if (_log.isDebugEnabled()) { - _log.debug("Aries Typed Event Bus Starting"); + if (_log.isInfoEnabled()) { + _log.info("Aries Typed Event Bus Starting"); } eventBus = coalesce( @@ -73,8 +73,8 @@ public class TypedEventBusActivator implements BundleActivator { .flatMap(configuration -> createProgram(configuration)) .run(bundleContext); - if (_log.isDebugEnabled()) { - _log.debug("Aries Typed Event Bus Started"); + if (_log.isInfoEnabled()) { + _log.info("Aries Typed Event Bus Started"); } } @@ -155,14 +155,14 @@ public class TypedEventBusActivator implements BundleActivator { @Override public void stop(BundleContext context) throws Exception { - if (_log.isDebugEnabled()) { - _log.debug("Aries Typed Event Bus Stopping"); + if (_log.isInfoEnabled()) { + _log.info("Aries Typed Event Bus Stopping"); } eventBus.close(); - if (_log.isDebugEnabled()) { - _log.debug("Aries Typed Event Bus Stopped"); + if (_log.isInfoEnabled()) { + _log.info("Aries Typed Event Bus Stopped"); } } } \ No newline at end of file diff --git a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java index 75211b0..c36c243 100644 --- a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java +++ b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java @@ -55,11 +55,15 @@ import org.osgi.service.typedevent.TypedEventHandler; import org.osgi.service.typedevent.UnhandledEventHandler; import org.osgi.service.typedevent.UntypedEventHandler; import org.osgi.util.converter.TypeReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Capability(namespace=SERVICE_NAMESPACE, attribute="objectClass:List<String>=org.osgi.service.typedevent.TypedEventBus", uses=TypedEventBus.class) @Capability(namespace=IMPLEMENTATION_NAMESPACE, name=TYPED_EVENT_IMPLEMENTATION, version=TYPED_EVENT_SPECIFICATION_VERSION, uses=TypedEventBus.class) public class TypedEventBusImpl implements TypedEventBus, AriesTypedEvents { + private static final Logger _log = LoggerFactory.getLogger(TypedEventBusImpl.class); + private static final TypeReference<List<String>> LIST_OF_STRINGS = new TypeReference<List<String>>() { }; @@ -149,6 +153,9 @@ public class TypedEventBusImpl implements TypedEventBus, AriesTypedEvents { } customEventConverter = converter; } + if(_log.isInfoEnabled()) { + _log.info("A Custom Event Converter was registered"); + } } void addTypedEventHandler(Bundle registeringBundle, TypedEventHandler<?> handler, Map<String, Object> properties) { @@ -166,8 +173,7 @@ public class TypedEventBusImpl implements TypedEventBus, AriesTypedEvents { try { genType = registeringBundle.loadClass(String.valueOf(type)); } catch (ClassNotFoundException e) { - // TODO Blow up - e.printStackTrace(); + _log.error("Unable to load the declared event type {} from bundle {}", type, registeringBundle, e); } } else { Class<?> toCheck = handler.getClass(); @@ -195,7 +201,7 @@ public class TypedEventBusImpl implements TypedEventBus, AriesTypedEvents { } return typeData.getRawType(); } else { - // TODO log + _log.error("Unable to determine the declared event type for service {} from bundle {}", getServiceId(properties), registeringBundle); throw new IllegalArgumentException("Unable to determine handled type for " + handler); } } @@ -238,7 +244,7 @@ public class TypedEventBusImpl implements TypedEventBus, AriesTypedEvents { List<String> topicList; if (prop == null) { if (defaultTopic == null) { - // TODO log a broken handler + _log.error("Unable to determine the registered topics for service {} from service {}", getServiceId(properties)); return; } else { topicList = Collections.singletonList(defaultTopic); @@ -251,7 +257,7 @@ public class TypedEventBusImpl implements TypedEventBus, AriesTypedEvents { .filter(s -> { String msg = checkTopicSyntax(s, true); if(msg != null) { - // TODO log this + _log.warn("The topic filter string {} from service {} is not valid: {}", s, getServiceId(properties), msg); } return msg == null; }) @@ -263,8 +269,7 @@ public class TypedEventBusImpl implements TypedEventBus, AriesTypedEvents { try { f = getFilter(serviceId, properties); } catch (IllegalArgumentException e) { - // TODO Log a broken handler - e.printStackTrace(); + _log.error("The event filter from service {} is not valid", getServiceId(properties), e); return; } @@ -289,6 +294,9 @@ public class TypedEventBusImpl implements TypedEventBus, AriesTypedEvents { handlers.put(handler, selector); } } + if(_log.isDebugEnabled()) { + _log.debug("Added service {} as an event handler for topics {}", serviceId, topicList); + } } void removeTypedEventHandler(TypedEventHandler<?> handler, Map<String, Object> properties) { @@ -333,6 +341,7 @@ public class TypedEventBusImpl implements TypedEventBus, AriesTypedEvents { private <T, U> void doRemoveEventHandler(Map<String, Map<T, U>> map, Map<String, Map<T, U>> wildcardMap, Map<Long, T> idMap, T handler, Long serviceId) { + List<String> loggable; synchronized (lock) { List<String> consumed = knownHandlers.remove(serviceId); idMap.remove(serviceId); @@ -355,8 +364,14 @@ public class TypedEventBusImpl implements TypedEventBus, AriesTypedEvents { } } }); + loggable = new ArrayList<String>(consumed); + } else { + loggable = Collections.emptyList(); } } + if(_log.isDebugEnabled()) { + _log.debug("Removed service {} as an event handler for topics {}", serviceId, loggable); + } } void updatedTypedEventHandler(Bundle registeringBundle, Map<String, Object> properties) { @@ -487,8 +502,9 @@ public class TypedEventBusImpl implements TypedEventBus, AriesTypedEvents { deliveryTasks.addAll(wildcardDeliveries); if (deliveryTasks.isEmpty()) { - // TODO log properly - System.out.println("Unhandled Event Handlers are being used for event sent to topic " + topic); + if(_log.isDebugEnabled()) { + _log.debug("Unhandled Event Handlers are being used for event sent to topic {}", topic); + } deliveryTasks = unhandledEventHandlers.stream() .map(handler -> new UnhandledEventTask(topic, convertibleEventData, handler)).collect(toList()); } @@ -622,8 +638,7 @@ public class TypedEventBusImpl implements TypedEventBus, AriesTypedEvents { try { take = queue.take(); } catch (InterruptedException e) { - // TODO log the interrupt and continue - e.printStackTrace(); + _log.info("The {} was interrupted while waiting for events", e); continue; } diff --git a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventMonitorImpl.java b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventMonitorImpl.java index 309f384..812a5e1 100644 --- a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventMonitorImpl.java +++ b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventMonitorImpl.java @@ -37,10 +37,14 @@ import org.osgi.util.pushstream.PushStreamProvider; import org.osgi.util.pushstream.PushbackPolicyOption; import org.osgi.util.pushstream.QueuePolicyOption; import org.osgi.util.pushstream.SimplePushEventSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Capability(namespace = ServiceNamespace.SERVICE_NAMESPACE, attribute = "objectClass:List<String>=org.osgi.service.typedevent.monitor.TypedEventMonitor", uses = TypedEventMonitor.class) public class TypedEventMonitorImpl implements TypedEventMonitor { + private static final Logger _log = LoggerFactory.getLogger(TypedEventMonitorImpl.class); + private final LinkedList<MonitorEvent> historicEvents = new LinkedList<MonitorEvent>(); private final ExecutorService monitoringWorker; @@ -116,10 +120,14 @@ public class TypedEventMonitorImpl implements TypedEventMonitor { for (MonitorEvent me : list) { try { if (pec.accept(PushEvent.data(me)) < 0) { + if(_log.isDebugEnabled()) { + _log.debug("Historical event delivery halted by the consumer"); + } return () -> { }; } } catch (Exception e) { + _log.warn("An error occurred delivering historical events", e); return () -> { }; } @@ -140,6 +148,7 @@ public class TypedEventMonitorImpl implements TypedEventMonitor { while (it.hasNext()) { MonitorEvent next = it.next(); if (next.publicationTime.isAfter(since)) { + // Go back one so we don't lose the first match it.previous(); break; } @@ -148,10 +157,14 @@ public class TypedEventMonitorImpl implements TypedEventMonitor { while (it.hasNext()) { try { if (pec.accept(PushEvent.data(it.next())) < 0) { + if(_log.isDebugEnabled()) { + _log.debug("Historical event delivery halted by the consumer"); + } return () -> { }; } } catch (Exception e) { + _log.warn("An error occurred delivering historical events", e); return () -> { }; } diff --git a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventTask.java b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventTask.java index 40cd64c..e1a1765 100644 --- a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventTask.java +++ b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventTask.java @@ -18,8 +18,13 @@ package org.apache.aries.typedevent.bus.impl; import org.osgi.service.typedevent.TypedEventHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class TypedEventTask extends EventTask { + + private static final Logger _log = LoggerFactory.getLogger(TypedEventTask.class); + private final String topic; private final TypeData targetEventClass; private final EventConverter eventData; @@ -36,11 +41,10 @@ class TypedEventTask extends EventTask { } @Override - public void notifyListener() { - try { - eventProcessor.notify(topic, eventData.toTypedEvent(targetEventClass)); - } catch (Exception e) { - // TODO log this, also blacklist? - } + public void unsafeNotify() { + if(_log.isDebugEnabled()) { + _log.debug("Distributing event data {} to the typed handler {}", eventData, eventProcessor); + } + eventProcessor.notify(topic, eventData.toTypedEvent(targetEventClass)); } } \ No newline at end of file diff --git a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/UnhandledEventTask.java b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/UnhandledEventTask.java index df2f495..427d36e 100644 --- a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/UnhandledEventTask.java +++ b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/UnhandledEventTask.java @@ -17,11 +17,14 @@ package org.apache.aries.typedevent.bus.impl; -import java.util.Map; - import org.osgi.service.typedevent.UnhandledEventHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class UnhandledEventTask extends EventTask { + + private static final Logger _log = LoggerFactory.getLogger(UnhandledEventTask.class); + private final String topic; private final EventConverter eventData; private final UnhandledEventHandler eventProcessor; @@ -33,13 +36,11 @@ class UnhandledEventTask extends EventTask { this.eventProcessor = eventProcessor; } - @SuppressWarnings("unchecked") @Override - public void notifyListener() { - try { - eventProcessor.notifyUnhandled(topic, (Map<String, Object>) eventData.toUntypedEvent()); - } catch (Exception e) { - // TODO log this, also blacklist? - } + public void unsafeNotify() { + if(_log.isDebugEnabled()) { + _log.debug("Distributing event data {} to the unhandled event handler {}", eventData, eventProcessor); + } + eventProcessor.notifyUnhandled(topic, eventData.toUntypedEvent()); } } \ No newline at end of file diff --git a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/UntypedEventTask.java b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/UntypedEventTask.java index 884e4be..0be4fe8 100644 --- a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/UntypedEventTask.java +++ b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/UntypedEventTask.java @@ -17,11 +17,14 @@ package org.apache.aries.typedevent.bus.impl; -import java.util.Map; - import org.osgi.service.typedevent.UntypedEventHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class UntypedEventTask extends EventTask { + + private static final Logger _log = LoggerFactory.getLogger(UntypedEventTask.class); + private final String topic; private final EventConverter eventData; private final UntypedEventHandler eventProcessor; @@ -33,13 +36,11 @@ class UntypedEventTask extends EventTask { this.eventProcessor = eventProcessor; } - @SuppressWarnings("unchecked") @Override - public void notifyListener() { - try { - eventProcessor.notifyUntyped(topic, (Map<String, Object>) eventData.toUntypedEvent()); - } catch (Exception e) { - // TODO log this, also blacklist? - } + public void unsafeNotify() { + if(_log.isDebugEnabled()) { + _log.debug("Distributing event data {} to the untyped handler {}", eventData, eventProcessor); + } + eventProcessor.notifyUntyped(topic, eventData.toUntypedEvent()); } } \ No newline at end of file
