Use new primitives in component-dsl

No need to implement the handling of the highest references here
anymore.


Project: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/commit/e6dfab21
Tree: 
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/tree/e6dfab21
Diff: 
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/diff/e6dfab21

Branch: refs/heads/master
Commit: e6dfab21c3ba99e1a6eb77bc27f467770a7da646
Parents: a1dc538
Author: Carlos Sierra <[email protected]>
Authored: Fri Nov 17 16:55:07 2017 +0100
Committer: Carlos Sierra <[email protected]>
Committed: Fri Nov 17 16:55:07 2017 +0100

----------------------------------------------------------------------
 .../aries/jax/rs/whiteboard/internal/Utils.java | 194 ++-----------------
 1 file changed, 13 insertions(+), 181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/e6dfab21/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Utils.java
----------------------------------------------------------------------
diff --git 
a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Utils.java
 
b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Utils.java
index 70f12b2..c7b271b 100644
--- 
a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Utils.java
+++ 
b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Utils.java
@@ -17,10 +17,8 @@
 
 package org.apache.aries.jax.rs.whiteboard.internal;
 
-import org.apache.aries.osgi.functional.Event;
 import org.apache.aries.osgi.functional.CachingServiceReference;
 import org.apache.aries.osgi.functional.OSGi;
-import org.apache.aries.osgi.functional.SentEvent;
 import org.apache.cxf.jaxrs.lifecycle.ResourceProvider;
 import org.apache.cxf.message.Message;
 import org.osgi.framework.ServiceObjects;
@@ -33,9 +31,6 @@ import java.util.Dictionary;
 import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.Map;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
@@ -43,6 +38,7 @@ import static 
org.apache.aries.osgi.functional.OSGi.bundleContext;
 import static org.apache.aries.osgi.functional.OSGi.just;
 import static org.apache.aries.osgi.functional.OSGi.nothing;
 import static org.apache.aries.osgi.functional.OSGi.onClose;
+import static org.apache.aries.osgi.functional.Utils.highest;
 
 /**
  * @author Carlos Sierra Andrés
@@ -117,11 +113,18 @@ public class Utils {
 
     public static <K, T extends Comparable<? super T>> OSGi<T> highestPer(
         Function<T, K> keySupplier, OSGi<T> program,
-        Consumer<T> onAddingShadowed, Consumer<T> onRemovedShadowed) {
-
-        return program.route(
-            new HighestPerRouter<>(
-                keySupplier, onAddingShadowed, onRemovedShadowed));
+        Consumer<? super T> onAddingShadowed,
+        Consumer<? super T> onRemovedShadowed) {
+
+        return program.splitBy(
+            keySupplier,
+            p -> highest(
+                p, Comparator.naturalOrder(),
+                discards -> discards.
+                    effects(onAddingShadowed, onRemovedShadowed).
+                    then(nothing())
+            )
+        );
     }
 
     public static <T> OSGi<ServiceTuple<T>> onlyGettables(
@@ -160,12 +163,6 @@ public class Utils {
             }));
     }
 
-    public static <T extends Comparable<? super T>> OSGi<T> highestRanked(
-        OSGi<T> program) {
-
-        return program.route(new HighestRankedRouter<>());
-    }
-
     public static <T> OSGi<T> service(
         CachingServiceReference<T> immutableServiceReference) {
 
@@ -313,169 +310,4 @@ public class Utils {
 
     }
 
-    private static class HighestRankedRouter<T extends Comparable<? super T>>
-        implements Consumer<OSGi.Router<T>> {
-
-        private final Comparator<Event<T>> _comparator;
-
-        public HighestRankedRouter() {
-            _comparator = Comparator.comparing(Event::getContent);
-        }
-
-        @Override
-        public void accept(OSGi.Router<T> router) {
-            AtomicReference<SentEvent<T>> _sent = new AtomicReference<>();
-            final TreeSet<Event<T>> _set = new TreeSet<>(_comparator);
-
-            router.onIncoming(ev -> {
-                synchronized (_set) {
-                    _set.add(ev);
-
-                    if (ev == _set.last()) {
-                        _sent.set(router.signalAdd(ev));
-                    }
-                }
-            });
-            router.onLeaving(ev -> {
-                synchronized (_set) {
-                    if (_set.isEmpty()) {
-                        return;
-                    }
-
-                    _set.remove(ev);
-
-                    SentEvent<T> sent = _sent.get();
-
-                    if (sent != null && sent.getEvent() == ev) {
-                        sent.terminate();
-
-                        _sent.set(null);
-                    }
-
-                    if (!_set.isEmpty()) {
-                        _sent.set(router.signalAdd(_set.last()));
-                    }
-                }
-            });
-        }
-
-    }
-
-    private static class HighestPerRouter<T extends Comparable<? super T>, K>
-        implements Consumer<OSGi.Router<T>> {
-
-        private final Function<T, K> _keySupplier;
-        private final Consumer<T> _onAddingShadowed;
-        private final Consumer<T> _onRemovedShadowed;
-
-        public HighestPerRouter(
-            Function<T, K> keySupplier,
-            Consumer<T> onAddingShadowed, Consumer<T> onRemovedShadowed) {
-
-            _keySupplier = keySupplier;
-            _onAddingShadowed = onAddingShadowed;
-            _onRemovedShadowed = onRemovedShadowed;
-        }
-
-        @Override
-        public void accept(OSGi.Router<T> router) {
-            /**
-             * These can't be fields on the class or they would be shared among
-             * invocations to OSGiResult.run :-O
-             */
-            final ConcurrentHashMap<K, TreeSet<Event<T>>> _map =
-                new ConcurrentHashMap<>();
-
-            final Map<K, SentEvent<T>> _sentEvents = new HashMap<>();
-
-            router.onIncoming(e -> {
-                K key = _keySupplier.apply(e.getContent());
-
-                Comparator<Event<T>> comparator = Comparator.comparing(
-                    Event::getContent);
-
-                _map.compute(
-                    key,
-                    (__, set) -> {
-                        if (set == null) {
-                            set = new TreeSet<>(comparator);
-                        }
-
-                        set.add(e);
-
-                        if (e == set.last()) {
-                            SentEvent<T> oldEvent = _sentEvents.get(key);
-
-                            if (oldEvent != null) {
-                                _sentEvents.remove(key);
-
-                                oldEvent.terminate();
-
-                                _onAddingShadowed.accept(
-                                    oldEvent.getEvent().getContent());
-                            }
-
-                            _sentEvents.put(key, router.signalAdd(e));
-                        }
-                        else {
-                            _onAddingShadowed.accept(e.getContent());
-                        }
-
-                        return set;
-                    });
-            });
-            router.onLeaving(e -> {
-                T content = e.getContent();
-
-                K key = _keySupplier.apply(content);
-
-                _map.compute(
-                    key,
-                    (__, set) -> {
-                        if (set.isEmpty()) {
-                            return set;
-                        }
-
-                        set.remove(e);
-
-                        _sentEvents.compute(key, (___, sentEvent) -> {
-                            if (sentEvent != null && sentEvent.getEvent() == 
e) {
-                                sentEvent.terminate();
-
-                                if (!set.isEmpty()) {
-                                    Event<T> last = set.last();
-
-                                    SentEvent<T> event = 
router.signalAdd(last);
-
-                                    _onRemovedShadowed.accept(
-                                        last.getContent());
-
-                                    return event;
-                                }
-
-                                return null;
-                            }
-                            else {
-                                _onRemovedShadowed.accept(e.getContent());
-                            }
-
-                            return sentEvent;
-                        });
-
-
-
-                        return set;
-                    }
-                );
-            });
-
-            router.onClose(() -> {
-                _sentEvents.values().forEach(SentEvent::terminate);
-
-                _sentEvents.clear();
-            });
-        }
-
-    }
-
 }

Reply via email to