Author: csierra
Date: Tue Oct 10 15:50:50 2017
New Revision: 1811720

URL: http://svn.apache.org/viewvc?rev=1811720&view=rev
Log:
[Component-DSL] Reimplement router

So it uses copies of the tuples and behaves in case the users invoke
signalAdding several times per tuple

Added:
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/SentEvent.java
Modified:
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java
    
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/HighestRankingRouter.java

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java?rev=1811720&r1=1811719&r2=1811720&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
 Tue Oct 10 15:50:50 2017
@@ -211,9 +211,7 @@ public interface OSGi<T> extends OSGiRun
                void onStart(Runnable start);
                void onClose(Runnable close);
 
-               void signalAdd(Event<T> event);
-               void signalLeave(Event<T> event);
-
+               SentEvent<T> signalAdd(Event<T> event);
        }
 
        public default <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun) {

Added: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/SentEvent.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/SentEvent.java?rev=1811720&view=auto
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/SentEvent.java
 (added)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/SentEvent.java
 Tue Oct 10 15:50:50 2017
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aries.osgi.functional;
+
+/**
+ * @author Carlos Sierra Andrés
+ */
+public interface SentEvent<T> {
+
+    Event<T> getEvent();
+
+    void terminate();
+}

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java?rev=1811720&r1=1811719&r2=1811720&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java
 Tue Oct 10 15:50:50 2017
@@ -18,6 +18,7 @@
 package org.apache.aries.osgi.functional.internal;
 
 import org.apache.aries.osgi.functional.Event;
+import org.apache.aries.osgi.functional.SentEvent;
 
 import java.util.function.Consumer;
 
@@ -43,15 +44,9 @@ public class RouteOsgiImpl<T> extends OS
 
             osgiResult.added.map(
                 t -> {
-                    Tuple<T> copy = Tuple.create(t.t);
+                    router._adding.accept(t);
 
-                    t.onTermination(() -> {
-                        router._leaving.accept(copy);
-
-                        copy.terminate();
-                    });
-
-                    router._adding.accept(copy);
+                    t.onTermination(() -> router._leaving.accept(t));
 
                     return null;
                 });
@@ -96,13 +91,18 @@ public class RouteOsgiImpl<T> extends OS
         }
 
         @Override
-        public void signalAdd(Event<T> event) {
-            _signalAdding.accept((Tuple<T>) event);
-        }
+        public SentEvent<T> signalAdd(Event<T> event) {
+            Tuple<T> tuple = (Tuple<T>) event;
 
-        @Override
-        public void signalLeave(Event<T> event) {
-            ((Tuple<T>)event).terminate();
+            Tuple<T> copy = Tuple.create(tuple.t);
+
+            tuple.addRelatedTuple(copy);
+
+            copy.setEvent(tuple);
+
+            _signalAdding.accept(copy);
+
+            return copy;
         }
 
         Consumer<Event<T>> _adding = (ign) -> {};

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java?rev=1811720&r1=1811719&r2=1811720&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java
 Tue Oct 10 15:50:50 2017
@@ -19,23 +19,26 @@
 package org.apache.aries.osgi.functional.internal;
 
 import org.apache.aries.osgi.functional.Event;
+import org.apache.aries.osgi.functional.SentEvent;
 
 import java.util.Deque;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
 /**
  * @author Carlos Sierra Andrés
  */
-class Tuple<T> implements Event<T> {
+class Tuple<T> implements Event<T>, SentEvent<T> {
 
        public T t;
        private Deque<Runnable> _closingHandlers = new LinkedList<>();
        private DoublyLinkedList<Tuple<?>> _relatedTuples =
                new DoublyLinkedList<>();
        private AtomicBoolean closed = new AtomicBoolean(false);
+       private Event<T> cause = this;
 
        private Tuple(T t) {
                this(t, new LinkedList<>());
@@ -43,6 +46,7 @@ class Tuple<T> implements Event<T> {
 
        private Tuple(T t, Deque<Runnable> closingHandlers) {
                this.t = t;
+
                _closingHandlers = closingHandlers;
        }
 
@@ -92,6 +96,15 @@ class Tuple<T> implements Event<T> {
                _closingHandlers.push(terminator);
        }
 
+       public void setEvent(Event<T> event) {
+               this.cause = event;
+       }
+
+       @Override
+       public Event<T> getEvent() {
+               return cause;
+       }
+
        public void terminate() {
                if (!closed.compareAndSet(false, true)) {
                        return;

Modified: 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/HighestRankingRouter.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/HighestRankingRouter.java?rev=1811720&r1=1811719&r2=1811720&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/HighestRankingRouter.java
 (original)
+++ 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/HighestRankingRouter.java
 Tue Oct 10 15:50:50 2017
@@ -18,6 +18,7 @@ package org.apache.aries.osgi.functional
 
 import org.apache.aries.osgi.functional.Event;
 import org.apache.aries.osgi.functional.OSGi;
+import org.apache.aries.osgi.functional.SentEvent;
 import org.osgi.framework.ServiceReference;
 
 import java.util.Comparator;
@@ -33,6 +34,7 @@ public class HighestRankingRouter<T exte
     implements Consumer<OSGi.Router<T>> {
 
     private PriorityQueue<Event<T>> _instances;
+    private SentEvent sent;
 
     public static <T> OSGi<ServiceReference<T>> highest(Class<T> clazz) {
         return serviceReferences(clazz).route(new HighestRankingRouter<>());
@@ -41,16 +43,14 @@ public class HighestRankingRouter<T exte
     @Override
     public void accept(OSGi.Router<T> router) {
         router.onIncoming(sr -> {
-            Event<T> old = _instances.peek();
-
             _instances.add(sr);
 
             if (_instances.peek() == sr) {
-                if (old != null) {
-                    router.signalLeave(old);
+                if (sent != null) {
+                    sent.terminate();
                 }
 
-                router.signalAdd(sr);
+                sent = router.signalAdd(sr);
             }
 
         });
@@ -63,10 +63,10 @@ public class HighestRankingRouter<T exte
             Event<T> current = _instances.peek();
 
             if (current != old) {
-                router.signalLeave(old);
+                sent.terminate();
 
                 if (current != null) {
-                    router.signalAdd(current);
+                    sent = router.signalAdd(current);
                 }
             }
         });
@@ -77,7 +77,9 @@ public class HighestRankingRouter<T exte
                     reversed()));
 
         router.onClose(() -> {
-            _instances.forEach(router::signalLeave);
+            if (sent != null) {
+                sent.terminate();
+            }
 
             _instances.clear();
         });


Reply via email to