Repository: aries-jax-rs-whiteboard
Updated Branches:
  refs/heads/master d5bdc7673 -> 2cf7f7f0e


RepeatInOrder not needed

Also code reorganization to make it more consistent


Project: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/commit/638ebef6
Tree: 
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/tree/638ebef6
Diff: 
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/diff/638ebef6

Branch: refs/heads/master
Commit: 638ebef6010e7058b8353a195d124b6cfab401d7
Parents: d5bdc76
Author: Carlos Sierra <[email protected]>
Authored: Mon Sep 4 12:35:30 2017 +0200
Committer: Carlos Sierra <[email protected]>
Committed: Mon Sep 4 12:35:30 2017 +0200

----------------------------------------------------------------------
 .../aries/jax/rs/whiteboard/internal/Utils.java | 256 +++++++++++--------
 1 file changed, 152 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/638ebef6/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Utils.java
----------------------------------------------------------------------
diff --git 
a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Utils.java
 
b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Utils.java
index c39fa82..1e5d15a 100644
--- 
a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Utils.java
+++ 
b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Utils.java
@@ -26,8 +26,8 @@ import org.osgi.framework.ServiceReference;
 
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
-import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Consumer;
@@ -73,7 +73,8 @@ public class Utils {
     public static <T> ResourceProvider getResourceProvider(
         ServiceObjects<T> serviceObjects) {
 
-        ServiceReference<T> serviceReference = 
serviceObjects.getServiceReference();
+        ServiceReference<T> serviceReference =
+            serviceObjects.getServiceReference();
 
         return new ComparableResourceProvider(serviceReference, 
serviceObjects);
     }
@@ -82,85 +83,9 @@ public class Utils {
         Function<T, K> keySupplier, OSGi<T> program,
         Consumer<T> onAddingShadowed, Consumer<T> onRemovedShadowed) {
 
-        ConcurrentHashMap<K, TreeSet<Event<T>>> map = new 
ConcurrentHashMap<>();
-
         return program.route(
-            router -> {
-                router.onIncoming(e -> {
-                    K key = keySupplier.apply(e.getContent());
-
-                    Comparator<Event<T>> comparator = Comparator.comparing(
-                        Event::getContent);
-
-                    map.compute(
-                        key,
-                        (__, set) -> {
-                            if (set == null) {
-                                set = new TreeSet<>(comparator);
-                            }
-
-                            Event<T> last = set.size() > 0 ? set.last() : null;
-
-                            boolean higher =
-                                (last == null) ||
-                                (comparator.compare(e, last) > 0);
-
-                            if (higher) {
-                                if (last != null) {
-                                    router.signalLeave(last);
-
-                                    onAddingShadowed.accept(last.getContent());
-                                }
-
-                                router.signalAdd(e);
-                            }
-                            else {
-                                onAddingShadowed.accept(e.getContent());
-                            }
-
-                            set.add(e);
-
-                            return set;
-                        });
-                });
-                router.onLeaving(e -> {
-                    T content = e.getContent();
-
-                    K key = keySupplier.apply(content);
-
-                    map.compute(
-                        key,
-                        (__, set) -> {
-                            if (set.isEmpty()) {
-                                return set;
-                            }
-
-                            Event<T> last = set.last();
-
-                            if (content.equals(last.getContent())) {
-                                router.signalLeave(e);
-
-                                Event<T> penultimate = set.lower(last);
-
-                                if (penultimate != null) {
-                                    router.signalAdd(penultimate);
-
-                                    onRemovedShadowed.accept(
-                                        penultimate.getContent());
-                                }
-                            }
-                            else {
-                                onRemovedShadowed.accept(content);
-                            }
-
-                            set.removeIf(t -> t.getContent().equals(content));
-
-                            return set;
-                        }
-                    );
-                });
-            }
-        );
+            new HighestPerRouter<>(
+                keySupplier, onAddingShadowed, onRemovedShadowed));
     }
 
     public static <T> OSGi<ServiceTuple<T>> onlyGettables(
@@ -200,7 +125,7 @@ public class Utils {
     public static <T extends Comparable<? super T>> OSGi<T> repeatInOrder(
         OSGi<T> program) {
 
-        return program.route(new RepeatInOrderRouter<>());
+        return program.route(new HighestRankedRouter<>());
     }
 
     public static <T> OSGi<T> service(ServiceReference<T> serviceReference) {
@@ -303,16 +228,11 @@ public class Utils {
 
         public ServiceReference<T> getServiceReference() {
             return _serviceReference;
-        }        @Override
-        public int compareTo(ServiceTuple<T> o) {
-            return _serviceReference.compareTo(o._serviceReference);
         }
-
         @Override
         public int hashCode() {
             return _serviceReference.hashCode();
         }
-
         @Override
         public boolean equals(Object o) {
             if (this == o) return true;
@@ -321,50 +241,178 @@ public class Utils {
             ServiceTuple<?> that = (ServiceTuple<?>) o;
 
             return _serviceReference.equals(that._serviceReference);
+        }@Override
+        public int compareTo(ServiceTuple<T> o) {
+            return _serviceReference.compareTo(o._serviceReference);
         }
 
 
 
     }
 
-    private static class RepeatInOrderRouter<T extends Comparable<? super T>>
+    private static class HighestRankedRouter<T extends Comparable<? super T>>
         implements Consumer<OSGi.Router<T>> {
 
-        private final TreeSet<Event<T>> _treeSet;
+        private final TreeSet<Event<T>> _set;
+        private final Comparator<Event<T>> _comparator;
 
-        public RepeatInOrderRouter() {
-            Comparator<Event<T>> comparing = Comparator.comparing(
-                Event::getContent);
+        public HighestRankedRouter() {
+            _comparator = Comparator.comparing(Event::getContent);
 
-            _treeSet = new TreeSet<>(comparing.reversed());
+            _set = new TreeSet<>(_comparator);
         }
 
         @Override
         public void accept(OSGi.Router<T> router) {
             router.onIncoming(ev -> {
-                _treeSet.add(ev);
+                synchronized (_set) {
+                    Event<T> last = _set.size() > 0 ? _set.last() : null;
+
+                    boolean higher =
+                        (last == null) ||
+                            (_comparator.compare(ev, last) > 0);
 
-                SortedSet<Event<T>> events = _treeSet.tailSet(ev, false);
-                events.forEach(router::signalLeave);
+                    if (higher) {
+                        if (last != null) {
+                            router.signalLeave(last);
+                        }
 
-                router.signalAdd(ev);
+                        router.signalAdd(ev);
+                    }
 
-                events.forEach(router::signalAdd);
+                    _set.add(ev);
+                }
             });
             router.onLeaving(ev -> {
-                _treeSet.remove(ev);
+                synchronized (_set) {
+                    if (_set.isEmpty()) {
+                        return;
+                    }
+
+                    T content = ev.getContent();
 
-                SortedSet<Event<T>> events = _treeSet.tailSet(ev, false);
-                events.forEach(router::signalLeave);
+                    Event<T> last = _set.last();
 
-                router.signalLeave(ev);
+                    if (content.equals(last.getContent())) {
+                        router.signalLeave(ev);
 
-                events.forEach(router::signalAdd);
+                        Event<T> penultimate = _set.lower(ev);
+
+                        if (penultimate != null) {
+                            router.signalAdd(penultimate);
+                        }
+                    }
+
+                    _set.removeIf(t -> content.equals(t.getContent()));
+                }
             });
             router.onClose(() -> {
-                _treeSet.forEach(router::signalLeave);
+                synchronized (_set) {
+                    Iterator<Event<T>> iterator = _set.descendingIterator();
+
+                    while (iterator.hasNext()) {
+                        Event<T> event = iterator.next();
+
+                        router.signalLeave(event);
+
+                        iterator.remove();
+                    }
+                }
+            });
+        }
+
+    }
+
+    private static class HighestPerRouter<T extends Comparable<? super T>, K>
+        implements Consumer<OSGi.Router<T>> {
+
+        private final Function<T, K> _keySupplier;
+        private final ConcurrentHashMap<K, TreeSet<Event<T>>> _map =
+            new ConcurrentHashMap<>();
+        private final Consumer<T> _onAddingShadowed;
+        private final Consumer<T> _onRemovedShadowed;
+
+        public HighestPerRouter(
+            Function<T, K> keySupplier,
+            Consumer<T> onAddingShadowed, Consumer<T> onRemovedShadowed) {
+
+            _keySupplier = keySupplier;
+            _onAddingShadowed = onAddingShadowed;
+            _onRemovedShadowed = onRemovedShadowed;
+        }
+
+        @Override
+        public void accept(OSGi.Router<T> router) {
+            router.onIncoming(e -> {
+                K key = _keySupplier.apply(e.getContent());
+
+                Comparator<Event<T>> comparator = Comparator.comparing(
+                    Event::getContent);
+
+                _map.compute(
+                    key,
+                    (__, set) -> {
+                        if (set == null) {
+                            set = new TreeSet<>(comparator);
+                        }
+
+                        Event<T> last = set.size() > 0 ? set.last() : null;
+
+                        boolean higher =
+                            (last == null) ||
+                                (comparator.compare(e, last) > 0);
+
+                        if (higher) {
+                            if (last != null) {
+                                router.signalLeave(last);
+
+                                _onAddingShadowed.accept(last.getContent());
+                            }
+
+                            router.signalAdd(e);
+                        } else {
+                            _onAddingShadowed.accept(e.getContent());
+                        }
+
+                        set.add(e);
+
+                        return set;
+                    });
+            });
+            router.onLeaving(e -> {
+                T content = e.getContent();
+
+                K key = _keySupplier.apply(content);
+
+                _map.compute(
+                    key,
+                    (__, set) -> {
+                        if (set.isEmpty()) {
+                            return set;
+                        }
+
+                        Event<T> last = set.last();
+
+                        if (content.equals(last.getContent())) {
+                            router.signalLeave(e);
+
+                            Event<T> penultimate = set.lower(last);
+
+                            if (penultimate != null) {
+                                router.signalAdd(penultimate);
+
+                                _onRemovedShadowed.accept(
+                                    penultimate.getContent());
+                            }
+                        } else {
+                            _onRemovedShadowed.accept(content);
+                        }
+
+                        set.removeIf(t -> t.getContent().equals(content));
 
-                _treeSet.clear();
+                        return set;
+                    }
+                );
             });
         }
 

Reply via email to