Repository: aries-jax-rs-whiteboard Updated Branches: refs/heads/master d5bdc7673 -> 2cf7f7f0e
RepeatInOrder not needed Also code reorganization to make it more consistent Project: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/repo Commit: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/commit/638ebef6 Tree: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/tree/638ebef6 Diff: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/diff/638ebef6 Branch: refs/heads/master Commit: 638ebef6010e7058b8353a195d124b6cfab401d7 Parents: d5bdc76 Author: Carlos Sierra <[email protected]> Authored: Mon Sep 4 12:35:30 2017 +0200 Committer: Carlos Sierra <[email protected]> Committed: Mon Sep 4 12:35:30 2017 +0200 ---------------------------------------------------------------------- .../aries/jax/rs/whiteboard/internal/Utils.java | 256 +++++++++++-------- 1 file changed, 152 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/638ebef6/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Utils.java ---------------------------------------------------------------------- diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Utils.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Utils.java index c39fa82..1e5d15a 100644 --- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Utils.java +++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Utils.java @@ -26,8 +26,8 @@ import org.osgi.framework.ServiceReference; import java.util.Comparator; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; -import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; @@ -73,7 +73,8 @@ public class Utils { public static <T> ResourceProvider getResourceProvider( ServiceObjects<T> serviceObjects) { - ServiceReference<T> serviceReference = serviceObjects.getServiceReference(); + ServiceReference<T> serviceReference = + serviceObjects.getServiceReference(); return new ComparableResourceProvider(serviceReference, serviceObjects); } @@ -82,85 +83,9 @@ public class Utils { Function<T, K> keySupplier, OSGi<T> program, Consumer<T> onAddingShadowed, Consumer<T> onRemovedShadowed) { - ConcurrentHashMap<K, TreeSet<Event<T>>> map = new ConcurrentHashMap<>(); - return program.route( - router -> { - router.onIncoming(e -> { - K key = keySupplier.apply(e.getContent()); - - Comparator<Event<T>> comparator = Comparator.comparing( - Event::getContent); - - map.compute( - key, - (__, set) -> { - if (set == null) { - set = new TreeSet<>(comparator); - } - - Event<T> last = set.size() > 0 ? set.last() : null; - - boolean higher = - (last == null) || - (comparator.compare(e, last) > 0); - - if (higher) { - if (last != null) { - router.signalLeave(last); - - onAddingShadowed.accept(last.getContent()); - } - - router.signalAdd(e); - } - else { - onAddingShadowed.accept(e.getContent()); - } - - set.add(e); - - return set; - }); - }); - router.onLeaving(e -> { - T content = e.getContent(); - - K key = keySupplier.apply(content); - - map.compute( - key, - (__, set) -> { - if (set.isEmpty()) { - return set; - } - - Event<T> last = set.last(); - - if (content.equals(last.getContent())) { - router.signalLeave(e); - - Event<T> penultimate = set.lower(last); - - if (penultimate != null) { - router.signalAdd(penultimate); - - onRemovedShadowed.accept( - penultimate.getContent()); - } - } - else { - onRemovedShadowed.accept(content); - } - - set.removeIf(t -> t.getContent().equals(content)); - - return set; - } - ); - }); - } - ); + new HighestPerRouter<>( + keySupplier, onAddingShadowed, onRemovedShadowed)); } public static <T> OSGi<ServiceTuple<T>> onlyGettables( @@ -200,7 +125,7 @@ public class Utils { public static <T extends Comparable<? super T>> OSGi<T> repeatInOrder( OSGi<T> program) { - return program.route(new RepeatInOrderRouter<>()); + return program.route(new HighestRankedRouter<>()); } public static <T> OSGi<T> service(ServiceReference<T> serviceReference) { @@ -303,16 +228,11 @@ public class Utils { public ServiceReference<T> getServiceReference() { return _serviceReference; - } @Override - public int compareTo(ServiceTuple<T> o) { - return _serviceReference.compareTo(o._serviceReference); } - @Override public int hashCode() { return _serviceReference.hashCode(); } - @Override public boolean equals(Object o) { if (this == o) return true; @@ -321,50 +241,178 @@ public class Utils { ServiceTuple<?> that = (ServiceTuple<?>) o; return _serviceReference.equals(that._serviceReference); + }@Override + public int compareTo(ServiceTuple<T> o) { + return _serviceReference.compareTo(o._serviceReference); } } - private static class RepeatInOrderRouter<T extends Comparable<? super T>> + private static class HighestRankedRouter<T extends Comparable<? super T>> implements Consumer<OSGi.Router<T>> { - private final TreeSet<Event<T>> _treeSet; + private final TreeSet<Event<T>> _set; + private final Comparator<Event<T>> _comparator; - public RepeatInOrderRouter() { - Comparator<Event<T>> comparing = Comparator.comparing( - Event::getContent); + public HighestRankedRouter() { + _comparator = Comparator.comparing(Event::getContent); - _treeSet = new TreeSet<>(comparing.reversed()); + _set = new TreeSet<>(_comparator); } @Override public void accept(OSGi.Router<T> router) { router.onIncoming(ev -> { - _treeSet.add(ev); + synchronized (_set) { + Event<T> last = _set.size() > 0 ? _set.last() : null; + + boolean higher = + (last == null) || + (_comparator.compare(ev, last) > 0); - SortedSet<Event<T>> events = _treeSet.tailSet(ev, false); - events.forEach(router::signalLeave); + if (higher) { + if (last != null) { + router.signalLeave(last); + } - router.signalAdd(ev); + router.signalAdd(ev); + } - events.forEach(router::signalAdd); + _set.add(ev); + } }); router.onLeaving(ev -> { - _treeSet.remove(ev); + synchronized (_set) { + if (_set.isEmpty()) { + return; + } + + T content = ev.getContent(); - SortedSet<Event<T>> events = _treeSet.tailSet(ev, false); - events.forEach(router::signalLeave); + Event<T> last = _set.last(); - router.signalLeave(ev); + if (content.equals(last.getContent())) { + router.signalLeave(ev); - events.forEach(router::signalAdd); + Event<T> penultimate = _set.lower(ev); + + if (penultimate != null) { + router.signalAdd(penultimate); + } + } + + _set.removeIf(t -> content.equals(t.getContent())); + } }); router.onClose(() -> { - _treeSet.forEach(router::signalLeave); + synchronized (_set) { + Iterator<Event<T>> iterator = _set.descendingIterator(); + + while (iterator.hasNext()) { + Event<T> event = iterator.next(); + + router.signalLeave(event); + + iterator.remove(); + } + } + }); + } + + } + + private static class HighestPerRouter<T extends Comparable<? super T>, K> + implements Consumer<OSGi.Router<T>> { + + private final Function<T, K> _keySupplier; + private final ConcurrentHashMap<K, TreeSet<Event<T>>> _map = + new ConcurrentHashMap<>(); + private final Consumer<T> _onAddingShadowed; + private final Consumer<T> _onRemovedShadowed; + + public HighestPerRouter( + Function<T, K> keySupplier, + Consumer<T> onAddingShadowed, Consumer<T> onRemovedShadowed) { + + _keySupplier = keySupplier; + _onAddingShadowed = onAddingShadowed; + _onRemovedShadowed = onRemovedShadowed; + } + + @Override + public void accept(OSGi.Router<T> router) { + router.onIncoming(e -> { + K key = _keySupplier.apply(e.getContent()); + + Comparator<Event<T>> comparator = Comparator.comparing( + Event::getContent); + + _map.compute( + key, + (__, set) -> { + if (set == null) { + set = new TreeSet<>(comparator); + } + + Event<T> last = set.size() > 0 ? set.last() : null; + + boolean higher = + (last == null) || + (comparator.compare(e, last) > 0); + + if (higher) { + if (last != null) { + router.signalLeave(last); + + _onAddingShadowed.accept(last.getContent()); + } + + router.signalAdd(e); + } else { + _onAddingShadowed.accept(e.getContent()); + } + + set.add(e); + + return set; + }); + }); + router.onLeaving(e -> { + T content = e.getContent(); + + K key = _keySupplier.apply(content); + + _map.compute( + key, + (__, set) -> { + if (set.isEmpty()) { + return set; + } + + Event<T> last = set.last(); + + if (content.equals(last.getContent())) { + router.signalLeave(e); + + Event<T> penultimate = set.lower(last); + + if (penultimate != null) { + router.signalAdd(penultimate); + + _onRemovedShadowed.accept( + penultimate.getContent()); + } + } else { + _onRemovedShadowed.accept(content); + } + + set.removeIf(t -> t.getContent().equals(content)); - _treeSet.clear(); + return set; + } + ); }); }
