Author: csierra
Date: Tue Oct 10 15:52:17 2017
New Revision: 1811730

URL: http://svn.apache.org/viewvc?rev=1811730&view=rev
Log:
[Component-DSL] Add atomic guards

Make sure that, if an effect has been executed the "counter effect" will
be fired

Modified:
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java?rev=1811730&r1=1811729&r2=1811730&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
 Tue Oct 10 15:52:17 2017
@@ -54,20 +54,23 @@ public class OSGiImpl<T> implements OSGi
        public OSGi<Void> foreach(
                Consumer<? super T> onAdded, Consumer<? super T> onRemoved) {
 
+               return OSGi.ignore(effects(onAdded, onRemoved));
+       }
+
+       @Override
+       public OSGi<T> effects(
+               Consumer<? super T> onAdded, Consumer<? super T> onRemoved) {
+
                return new OSGiImpl<>((bundleContext, op) ->
                        _operation.run(
                                bundleContext,
-               t -> {
-                       t.onTermination(() -> onRemoved.accept(t._t));
-
-                       onAdded.accept(t._t);
-
-                                       Tuple<Void> tuple = Tuple.create(null);
+                               t -> {
+                                       onAdded.accept(t._t);
 
-                                       t.addRelatedTuple(tuple);
+                                       op.accept(t);
 
-                                       op.accept(tuple);
-               }));
+                                       t.onTermination(() -> 
onRemoved.accept(t._t));
+                               }));
        }
 
        @Override
@@ -84,16 +87,7 @@ public class OSGiImpl<T> implements OSGi
        @Override
        public OSGiResult run(BundleContext bundleContext, Consumer<T> andThen) 
{
                OSGiResultImpl osgiResult =
-                       _operation.run(
-                               bundleContext,
-                               t -> {
-                                       if (!t.isClosed()) {
-                                               andThen.accept(t._t);
-                                       }
-                                       if (t.isClosed()) {
-                                               t.terminate();
-                                       }
-                               });
+                       _operation.run(bundleContext, t -> 
andThen.accept(t._t));
 
                osgiResult.start();
 
@@ -233,12 +227,9 @@ public class OSGiImpl<T> implements OSGi
                Consumer<Tuple<S>> addedSource, Tuple<Function<T, S>> fTuple,
                Tuple<T> t) {
 
-               S result = fTuple.getContent().apply(t.getContent());
-
-               Tuple<S> tuple = Tuple.create(result);
+               Tuple<S> tuple = t.map(fTuple.getContent());
 
                fTuple.addRelatedTuple(tuple);
-               t.addRelatedTuple(tuple);
 
                addedSource.accept(tuple);
        }

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java?rev=1811730&r1=1811729&r2=1811730&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
 Tue Oct 10 15:52:17 2017
@@ -19,16 +19,19 @@ package org.apache.aries.osgi.functional
 
 import org.apache.aries.osgi.functional.OSGiResult;
 
-import java.util.function.Consumer;
-import java.util.function.Function;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * @author Carlos Sierra Andrés
  */
 public class OSGiResultImpl implements OSGiResult {
 
-       public Runnable start;
-       public Runnable close;
+       private final Runnable start;
+       private final Runnable close;
+       private AtomicBoolean _working = new AtomicBoolean();
+       private AtomicBoolean _closed = new AtomicBoolean();
+       private volatile boolean _started = false;
+
 
        public OSGiResultImpl(Runnable start, Runnable close) {
                this.start = start;
@@ -37,12 +40,38 @@ public class OSGiResultImpl implements O
 
        @Override
        public void start() {
-               start.run();
+               if (_working.compareAndSet(false, true)) {
+
+                       if (!_started && !_closed.get()) {
+                               try {
+                                       start.run();
+
+                                       _started = true;
+                               }
+                               catch (Exception e) {
+                               }
+                       }
+
+                       _working.set(false);
+               }
+
        }
 
        @Override
        public void close() {
-               close.run();
+               while (!_working.compareAndSet(false, true)) {
+                       Thread.yield();
+               }
+
+               if (_closed.compareAndSet(false, true) && _started) {
+                       try {
+                               close.run();
+                       }
+                       catch (Exception e) {
+                       }
+               }
+
+               _working.set(false);
        }
 
 }

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java?rev=1811730&r1=1811729&r2=1811730&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java
 Tue Oct 10 15:52:17 2017
@@ -44,11 +44,11 @@ public class RouteOsgiImpl<T> extends OS
             return new OSGiResultImpl(
                 () -> {
                     router._start.run();
-                    osgiResult.start.run();
+                    osgiResult.start();
                 },
                 () -> {
                     router._close.run();
-                    osgiResult.close.run();
+                    osgiResult.close();
                 });
         });
     }

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java?rev=1811730&r1=1811729&r2=1811730&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java
 Tue Oct 10 15:52:17 2017
@@ -24,6 +24,8 @@ import org.apache.aries.osgi.functional.
 import java.util.Deque;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 
 /**
@@ -32,25 +34,35 @@ import java.util.function.Function;
 class Tuple<T> implements Event<T> {
 
        public final T _t;
-       private final Deque<Runnable> _closingHandlers = new LinkedList<>();
+       private final Deque<Runnable> _closingHandlers =
+               new ConcurrentLinkedDeque<>();
        private final ConcurrentDoublyLinkedList<Tuple<?>> _relatedTuples =
                new ConcurrentDoublyLinkedList<>();
-       private volatile boolean _closed = false;
+       private AtomicBoolean _closed = new AtomicBoolean();
+       private AtomicBoolean _working = new AtomicBoolean();
 
        private Tuple(T t) {
                _t = t;
        }
 
        public void addRelatedTuple(Tuple<?> tuple) {
-               if (_closed) {
-                       tuple.terminate();
-
-                       return;
+               while (!_working.compareAndSet(false, true)) {
+                       Thread.yield();
                }
+               try {
+                       if (_closed.get()) {
+                tuple.terminate();
+
+                return;
+            }
 
-               ConcurrentDoublyLinkedList.Node node = 
_relatedTuples.addLast(tuple);
+                       ConcurrentDoublyLinkedList.Node node = 
_relatedTuples.addLast(tuple);
 
-               tuple.onTermination(node::remove);
+                       tuple.onTermination(node::remove);
+               }
+               finally {
+                       _working.set(false);
+               }
        }
 
        public static <T> Tuple<T> create(T t) {
@@ -73,7 +85,7 @@ class Tuple<T> implements Event<T> {
        }
 
        public boolean isClosed() {
-               return _closed;
+               return _closed.get();
        }
 
        public <S> Tuple<S> map(Function<? super T, ? extends S> fun) {
@@ -85,17 +97,35 @@ class Tuple<T> implements Event<T> {
        }
 
        public void onTermination(Runnable terminator) {
-               if (_closed) {
-                       terminator.run();
+               while (!_working.compareAndSet(false, true)) {
+                       Thread.yield();
+               }
+               try {
+                       if (_closed.get()) {
+                terminator.run();
+
+                return;
+            }
 
-                       return;
+                       _closingHandlers.push(terminator);
+               }
+               finally {
+                       _working.set(false);
                }
 
-               _closingHandlers.push(terminator);
        }
 
        public void terminate() {
-               _closed = true;
+               while (!_working.compareAndSet(false, true)) {
+               }
+               try {
+                       if (!_closed.compareAndSet(false, true)) {
+                return;
+            }
+               }
+               finally {
+                       _working.set(false);
+               }
 
                Iterator<Tuple<?>> iterator = _relatedTuples.iterator();
 
@@ -120,6 +150,7 @@ class Tuple<T> implements Event<T> {
                        catch (Exception e) {
                        }
                }
+
        }
 
 }


Reply via email to