Use new primitives in component-dsl No need to implement the handling of the highest references here anymore.
Project: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/repo Commit: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/commit/e6dfab21 Tree: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/tree/e6dfab21 Diff: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/diff/e6dfab21 Branch: refs/heads/master Commit: e6dfab21c3ba99e1a6eb77bc27f467770a7da646 Parents: a1dc538 Author: Carlos Sierra <[email protected]> Authored: Fri Nov 17 16:55:07 2017 +0100 Committer: Carlos Sierra <[email protected]> Committed: Fri Nov 17 16:55:07 2017 +0100 ---------------------------------------------------------------------- .../aries/jax/rs/whiteboard/internal/Utils.java | 194 ++----------------- 1 file changed, 13 insertions(+), 181 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/e6dfab21/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Utils.java ---------------------------------------------------------------------- diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Utils.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Utils.java index 70f12b2..c7b271b 100644 --- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Utils.java +++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Utils.java @@ -17,10 +17,8 @@ package org.apache.aries.jax.rs.whiteboard.internal; -import org.apache.aries.osgi.functional.Event; import org.apache.aries.osgi.functional.CachingServiceReference; import org.apache.aries.osgi.functional.OSGi; -import org.apache.aries.osgi.functional.SentEvent; import org.apache.cxf.jaxrs.lifecycle.ResourceProvider; import org.apache.cxf.message.Message; import org.osgi.framework.ServiceObjects; @@ -33,9 +31,6 @@ import java.util.Dictionary; import java.util.HashMap; import java.util.Hashtable; import java.util.Map; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; @@ -43,6 +38,7 @@ import static org.apache.aries.osgi.functional.OSGi.bundleContext; import static org.apache.aries.osgi.functional.OSGi.just; import static org.apache.aries.osgi.functional.OSGi.nothing; import static org.apache.aries.osgi.functional.OSGi.onClose; +import static org.apache.aries.osgi.functional.Utils.highest; /** * @author Carlos Sierra Andrés @@ -117,11 +113,18 @@ public class Utils { public static <K, T extends Comparable<? super T>> OSGi<T> highestPer( Function<T, K> keySupplier, OSGi<T> program, - Consumer<T> onAddingShadowed, Consumer<T> onRemovedShadowed) { - - return program.route( - new HighestPerRouter<>( - keySupplier, onAddingShadowed, onRemovedShadowed)); + Consumer<? super T> onAddingShadowed, + Consumer<? super T> onRemovedShadowed) { + + return program.splitBy( + keySupplier, + p -> highest( + p, Comparator.naturalOrder(), + discards -> discards. + effects(onAddingShadowed, onRemovedShadowed). + then(nothing()) + ) + ); } public static <T> OSGi<ServiceTuple<T>> onlyGettables( @@ -160,12 +163,6 @@ public class Utils { })); } - public static <T extends Comparable<? super T>> OSGi<T> highestRanked( - OSGi<T> program) { - - return program.route(new HighestRankedRouter<>()); - } - public static <T> OSGi<T> service( CachingServiceReference<T> immutableServiceReference) { @@ -313,169 +310,4 @@ public class Utils { } - private static class HighestRankedRouter<T extends Comparable<? super T>> - implements Consumer<OSGi.Router<T>> { - - private final Comparator<Event<T>> _comparator; - - public HighestRankedRouter() { - _comparator = Comparator.comparing(Event::getContent); - } - - @Override - public void accept(OSGi.Router<T> router) { - AtomicReference<SentEvent<T>> _sent = new AtomicReference<>(); - final TreeSet<Event<T>> _set = new TreeSet<>(_comparator); - - router.onIncoming(ev -> { - synchronized (_set) { - _set.add(ev); - - if (ev == _set.last()) { - _sent.set(router.signalAdd(ev)); - } - } - }); - router.onLeaving(ev -> { - synchronized (_set) { - if (_set.isEmpty()) { - return; - } - - _set.remove(ev); - - SentEvent<T> sent = _sent.get(); - - if (sent != null && sent.getEvent() == ev) { - sent.terminate(); - - _sent.set(null); - } - - if (!_set.isEmpty()) { - _sent.set(router.signalAdd(_set.last())); - } - } - }); - } - - } - - private static class HighestPerRouter<T extends Comparable<? super T>, K> - implements Consumer<OSGi.Router<T>> { - - private final Function<T, K> _keySupplier; - private final Consumer<T> _onAddingShadowed; - private final Consumer<T> _onRemovedShadowed; - - public HighestPerRouter( - Function<T, K> keySupplier, - Consumer<T> onAddingShadowed, Consumer<T> onRemovedShadowed) { - - _keySupplier = keySupplier; - _onAddingShadowed = onAddingShadowed; - _onRemovedShadowed = onRemovedShadowed; - } - - @Override - public void accept(OSGi.Router<T> router) { - /** - * These can't be fields on the class or they would be shared among - * invocations to OSGiResult.run :-O - */ - final ConcurrentHashMap<K, TreeSet<Event<T>>> _map = - new ConcurrentHashMap<>(); - - final Map<K, SentEvent<T>> _sentEvents = new HashMap<>(); - - router.onIncoming(e -> { - K key = _keySupplier.apply(e.getContent()); - - Comparator<Event<T>> comparator = Comparator.comparing( - Event::getContent); - - _map.compute( - key, - (__, set) -> { - if (set == null) { - set = new TreeSet<>(comparator); - } - - set.add(e); - - if (e == set.last()) { - SentEvent<T> oldEvent = _sentEvents.get(key); - - if (oldEvent != null) { - _sentEvents.remove(key); - - oldEvent.terminate(); - - _onAddingShadowed.accept( - oldEvent.getEvent().getContent()); - } - - _sentEvents.put(key, router.signalAdd(e)); - } - else { - _onAddingShadowed.accept(e.getContent()); - } - - return set; - }); - }); - router.onLeaving(e -> { - T content = e.getContent(); - - K key = _keySupplier.apply(content); - - _map.compute( - key, - (__, set) -> { - if (set.isEmpty()) { - return set; - } - - set.remove(e); - - _sentEvents.compute(key, (___, sentEvent) -> { - if (sentEvent != null && sentEvent.getEvent() == e) { - sentEvent.terminate(); - - if (!set.isEmpty()) { - Event<T> last = set.last(); - - SentEvent<T> event = router.signalAdd(last); - - _onRemovedShadowed.accept( - last.getContent()); - - return event; - } - - return null; - } - else { - _onRemovedShadowed.accept(e.getContent()); - } - - return sentEvent; - }); - - - - return set; - } - ); - }); - - router.onClose(() -> { - _sentEvents.values().forEach(SentEvent::terminate); - - _sentEvents.clear(); - }); - } - - } - }
