Adapt to new router interface
Project: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/repo Commit: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/commit/9b895c0d Tree: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/tree/9b895c0d Diff: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/diff/9b895c0d Branch: refs/heads/master Commit: 9b895c0de252ec4379bcfc39cd7e6c81b2e6a312 Parents: 7c7b771 Author: Carlos Sierra <[email protected]> Authored: Wed Oct 4 14:26:11 2017 +0200 Committer: Carlos Sierra <[email protected]> Committed: Tue Oct 10 17:59:42 2017 +0200 ---------------------------------------------------------------------- .../aries/jax/rs/whiteboard/internal/Utils.java | 112 +++++++++---------- 1 file changed, 53 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/9b895c0d/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Utils.java ---------------------------------------------------------------------- diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Utils.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Utils.java index c4967cc..b373e20 100644 --- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Utils.java +++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Utils.java @@ -19,6 +19,7 @@ package org.apache.aries.jax.rs.whiteboard.internal; import org.apache.aries.osgi.functional.Event; import org.apache.aries.osgi.functional.OSGi; +import org.apache.aries.osgi.functional.SentEvent; import org.apache.cxf.jaxrs.lifecycle.ResourceProvider; import org.apache.cxf.message.Message; import org.osgi.framework.ServiceObjects; @@ -292,6 +293,7 @@ public class Utils { private final TreeSet<Event<T>> _set; private final Comparator<Event<T>> _comparator; + private SentEvent _sent; public HighestRankedRouter() { _comparator = Comparator.comparing(Event::getContent); @@ -303,21 +305,11 @@ public class Utils { public void accept(OSGi.Router<T> router) { router.onIncoming(ev -> { synchronized (_set) { - Event<T> last = _set.size() > 0 ? _set.last() : null; - - boolean higher = - (last == null) || - (_comparator.compare(ev, last) > 0); - - if (higher) { - if (last != null) { - router.signalLeave(last); - } + _set.add(ev); - router.signalAdd(ev); + if (ev == _set.last()) { + _sent = router.signalAdd(ev); } - - _set.add(ev); } }); router.onLeaving(ev -> { @@ -326,35 +318,23 @@ public class Utils { return; } - T content = ev.getContent(); - - Event<T> last = _set.last(); - - if (content.equals(last.getContent())) { - router.signalLeave(ev); + _set.remove(ev); - Event<T> penultimate = _set.lower(ev); - - if (penultimate != null) { - router.signalAdd(penultimate); - } + if (_sent != null && _sent.getEvent() == ev) { + _sent = null; } - _set.removeIf(t -> content.equals(t.getContent())); + if (!_set.isEmpty()) { + _sent = router.signalAdd(_set.last()); + } } }); router.onClose(() -> { - synchronized (_set) { - Iterator<Event<T>> iterator = _set.descendingIterator(); - - while (iterator.hasNext()) { - Event<T> event = iterator.next(); - - router.signalLeave(event); - - iterator.remove(); - } + if (_sent != null) { + _sent.terminate(); } + + _sent = null; }); } @@ -368,6 +348,7 @@ public class Utils { new ConcurrentHashMap<>(); private final Consumer<T> _onAddingShadowed; private final Consumer<T> _onRemovedShadowed; + private final Map<K, SentEvent<T>> _sentEvents = new HashMap<>(); public HighestPerRouter( Function<T, K> keySupplier, @@ -393,26 +374,26 @@ public class Utils { set = new TreeSet<>(comparator); } - Event<T> last = set.size() > 0 ? set.last() : null; + set.add(e); + + if (e == set.last()) { + SentEvent<T> oldEvent = _sentEvents.get(key); - boolean higher = - (last == null) || - (comparator.compare(e, last) > 0); + if (oldEvent != null) { + _sentEvents.remove(key); - if (higher) { - if (last != null) { - router.signalLeave(last); + oldEvent.terminate(); - _onAddingShadowed.accept(last.getContent()); + _onAddingShadowed.accept( + oldEvent.getEvent().getContent()); } - router.signalAdd(e); - } else { + _sentEvents.put(key, router.signalAdd(e)); + } + else { _onAddingShadowed.accept(e.getContent()); } - set.add(e); - return set; }); }); @@ -428,29 +409,42 @@ public class Utils { return set; } - Event<T> last = set.last(); + set.remove(e); - if (content.equals(last.getContent())) { - router.signalLeave(e); + _sentEvents.compute(key, (___, sentEvent) -> { + if (sentEvent.getEvent() == e) { + if (!set.isEmpty()) { + Event<T> last = set.last(); - Event<T> penultimate = set.lower(last); + SentEvent<T> event = router.signalAdd(last); - if (penultimate != null) { - router.signalAdd(penultimate); + _onRemovedShadowed.accept( + last.getContent()); - _onRemovedShadowed.accept( - penultimate.getContent()); + return event; + } + + return null; } - } else { - _onRemovedShadowed.accept(content); - } + else { + _onRemovedShadowed.accept(e.getContent()); + } + + return sentEvent; + }); + - set.removeIf(t -> t.getContent().equals(content)); return set; } ); }); + + router.onClose(() -> { + _sentEvents.values().forEach(SentEvent::terminate); + + _sentEvents.clear(); + }); } }
