Author: csierra
Date: Fri Nov 17 16:00:14 2017
New Revision: 1815579

URL: http://svn.apache.org/viewvc?rev=1815579&view=rev
Log:
[Component-DSL] Add new branching primitives

These branching primitives will allow to discriminate according to the
incoming values and generate different branches that will be able to
keep their own state, like aggregations or highest values.

The difference with a flatMap is that the latter always _spawns_ new
producers though making it impossible (at least with the current
implementation) to accumulate the highest service references
discriminated by some property.

I don't know if the fact that I had to introduce these primitives is a
signal that I am missing some generalization that could make all these
primitives subject to be abstracted out.

Added:
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/HighestsPerTransformer.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Utils.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/AllOSGi.java
      - copied, changed from r1815578, 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingTransformer.java
      - copied, changed from r1815578, 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/HighestRankingTransformer.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java
      - copied, changed from r1815578, 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java
    
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/UtilTest.java
Removed:
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java
    
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java
    
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/HighestRankingTransformer.java
Modified:
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
    
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java
    
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/AsynchronousTest.java
    
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java
    
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java

Added: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/HighestsPerTransformer.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/HighestsPerTransformer.java?rev=1815579&view=auto
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/HighestsPerTransformer.java
 (added)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/HighestsPerTransformer.java
 Fri Nov 17 16:00:14 2017
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aries.osgi.functional;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TreeSet;
+import java.util.function.Function;
+
+import static org.apache.aries.osgi.functional.OSGi.NOOP;
+
+/**
+ * @author Carlos Sierra Andrés
+ */
+public class HighestsPerTransformer<T, K> implements Transformer<T, List<T>> {
+
+    public HighestsPerTransformer(
+        Function<T, K> mapper, Comparator<T> comparator) {
+
+        _mapper = mapper;
+        _comparator = comparator;
+    }
+
+    @Override
+    public Function<T, Runnable> apply(Function<List<T>, Runnable> pipe) {
+        HashMap<K, TreeSet<T>> sets = new HashMap<>();
+        HashMap<K, Runnable> terminators = new HashMap<>();
+
+        return t -> {
+            K key = _mapper.apply(t);
+
+            TreeSet<T> set = sets.computeIfAbsent(
+                key, k -> new TreeSet<>(_comparator));
+
+            set.add(t);
+
+            publish(pipe, terminators, key, set);
+
+            return () -> {
+                set.remove(t);
+
+                publish(pipe, terminators, key, set);
+            };
+        };
+
+    }
+
+    private final Function<T, K> _mapper;
+    private final Comparator<T> _comparator;
+
+    private static <K, T> void publish(
+        Function<List<T>, Runnable> op, HashMap<K, Runnable> terminators,
+        K key, TreeSet<T> set) {
+
+        terminators.compute(key, (__, old) -> {
+            if (old != null) {
+                old.run();
+            }
+
+            return op.apply(new ArrayList<>(set));
+        });
+    }
+
+}

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java?rev=1815579&r1=1815578&r2=1815579&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
 Fri Nov 17 16:00:14 2017
@@ -28,7 +28,7 @@ import org.apache.aries.osgi.functional.
 import org.apache.aries.osgi.functional.internal.ChangeContextOSGiImpl;
 import org.apache.aries.osgi.functional.internal.ConfigurationOSGiImpl;
 import org.apache.aries.osgi.functional.internal.ConfigurationsOSGiImpl;
-import org.apache.aries.osgi.functional.internal.DistributeOSGi;
+import org.apache.aries.osgi.functional.internal.AllOSGi;
 import org.apache.aries.osgi.functional.internal.IgnoreImpl;
 import org.apache.aries.osgi.functional.internal.JustOSGiImpl;
 import org.apache.aries.osgi.functional.internal.NothingOSGiImpl;
@@ -58,6 +58,9 @@ import java.util.function.Supplier;
  */
 public interface OSGi<T> extends OSGiRunnable<T> {
        Runnable NOOP = () -> {};
+       
+       <K, S> OSGi<S> splitBy(
+               Function<T, K> mapper, Function<OSGi<T>, OSGi<S>> fun);
 
        OSGi<T> recover(BiFunction<T, Exception, T> onError);
 
@@ -66,10 +69,6 @@ public interface OSGi<T> extends OSGiRun
        OSGi<T> effects(
                Consumer<? super T> onAdded, Consumer<? super T> onRemoved);
 
-       static OSGi<Void> ignore(OSGi<?> program) {
-               return new IgnoreImpl(program);
-       }
-
        <S> OSGi<S> map(Function<? super T, ? extends S> function);
 
        <S> OSGi<S> flatMap(Function<? super T, OSGi<? extends S>> fun);
@@ -81,7 +80,11 @@ public interface OSGi<T> extends OSGiRun
        OSGi<Void> foreach(
                Consumer<? super T> onAdded, Consumer<? super T> onRemoved);
 
-       <S> OSGi<S> transformer(Function<Function<S, Runnable>, Function<T, 
Runnable>> fun);
+       <S> OSGi<S> transform(Function<Function<S, Runnable>, Function<T, 
Runnable>> fun);
+
+       static OSGi<Void> ignore(OSGi<?> program) {
+               return new IgnoreImpl(program);
+       }
 
        static OSGi<BundleContext> bundleContext() {
 
@@ -198,7 +201,7 @@ public interface OSGi<T> extends OSGiRun
        }
 
        public static <T> OSGi<T> once(OSGi<T> program) {
-               return program.transformer(op -> {
+               return program.transform(op -> {
                        AtomicInteger count = new AtomicInteger();
 
                        AtomicReference<Runnable> terminator = new 
AtomicReference<>();
@@ -259,7 +262,7 @@ public interface OSGi<T> extends OSGiRun
 
        @SafeVarargs
        static <T> OSGi<T> all(OSGi<T> ... programs) {
-               return new DistributeOSGi<>(programs);
+               return new AllOSGi<>(programs);
        }
 
        OSGi<T> filter(Predicate<T> predicate);

Added: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Utils.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Utils.java?rev=1815579&view=auto
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Utils.java
 (added)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Utils.java
 Fri Nov 17 16:00:14 2017
@@ -0,0 +1,80 @@
+package org.apache.aries.osgi.functional;
+
+import org.apache.aries.osgi.functional.internal.ConcurrentDoublyLinkedList;
+import org.apache.aries.osgi.functional.internal.HighestRankingTransformer;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiPredicate;
+import java.util.function.Function;
+
+import static org.apache.aries.osgi.functional.OSGi.NOOP;
+
+/**
+ * @author Carlos Sierra Andrés
+ */
+public interface Utils {
+
+    static <T extends Comparable<? super T>> OSGi<T> highest(OSGi<T> program) {
+        return program.transform(
+            new HighestRankingTransformer<>(Comparator.naturalOrder()));
+    }
+
+    static <T> OSGi<T> highest(
+        Comparator<? super T> comparator, OSGi<T> program) {
+
+        return program.transform(new HighestRankingTransformer<>(comparator));
+    }
+
+    static <T> OSGi<List<T>> accumulate(OSGi<T> program) {
+        return program.transform(op -> {
+            ConcurrentDoublyLinkedList<T> list =
+                new ConcurrentDoublyLinkedList<>();
+
+            AtomicReference<Runnable> terminator = new AtomicReference<>(NOOP);
+
+            return t -> {
+                ConcurrentDoublyLinkedList.Node node = list.addLast(t);
+
+                publish(op, list, terminator);
+
+                return () -> {
+                    node.remove();
+
+                    publish(op, list, terminator);
+                };
+            };
+        });
+    }
+
+    static <T> void publish(Function<List<T>, Runnable> op, 
ConcurrentDoublyLinkedList<T> list, AtomicReference<Runnable> terminator) {
+        Runnable runnable = terminator.get();
+
+        runnable.run();
+
+        terminator.set(op.apply(new ArrayList<>(list)));
+    }
+
+    static <T> OSGi<T> republishIf(
+        BiPredicate<T, T> refresher, OSGi<T> program) {
+
+        return program.transform(op -> {
+            AtomicReference<T> old = new AtomicReference<>();
+            AtomicReference<Runnable> terminator = new AtomicReference<>(NOOP);
+
+            return t -> {
+                if (refresher.test(old.get(), t)) {
+                    terminator.get().run();
+
+                    old.set(t);
+                    terminator.set(op.apply(t));
+                }
+
+                return () -> {};
+            };
+        });
+    }
+
+}

Copied: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/AllOSGi.java
 (from r1815578, 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java)
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/AllOSGi.java?p2=aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/AllOSGi.java&p1=aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java&r1=1815578&r2=1815579&rev=1815579&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/AllOSGi.java
 Fri Nov 17 16:00:14 2017
@@ -29,10 +29,10 @@ import java.util.stream.Collectors;
 /**
  * @author Carlos Sierra Andrés
  */
-public class DistributeOSGi<T> extends OSGiImpl<T> {
+public class AllOSGi<T> extends OSGiImpl<T> {
 
     @SafeVarargs
-    public DistributeOSGi(OSGi<T>... programs) {
+    public AllOSGi(OSGi<T>... programs) {
         super((bundleContext, op) -> {
             List<OSGiResult> results = new ArrayList<>();
 

Copied: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingTransformer.java
 (from r1815578, 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/HighestRankingTransformer.java)
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingTransformer.java?p2=aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingTransformer.java&p1=aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/HighestRankingTransformer.java&r1=1815578&r2=1815579&rev=1815579&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/HighestRankingTransformer.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingTransformer.java
 Fri Nov 17 16:00:14 2017
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.aries.osgi.functional.test;
+package org.apache.aries.osgi.functional.internal;
 
 import org.apache.aries.osgi.functional.Transformer;
 
@@ -29,12 +29,15 @@ import static org.apache.aries.osgi.func
 /**
  * @author Carlos Sierra Andrés
  */
-public class HighestRankingTransformer<T extends Comparable<? super T>>
-    implements Transformer<T, T> {
+public class HighestRankingTransformer<T> implements Transformer<T, T> {
+
+    public HighestRankingTransformer(Comparator<? super T> comparator) {
+        _comparator = comparator;
+    }
 
     @Override
     public Function<T, Runnable> apply(Function<T, Runnable> publisher) {
-        PriorityQueue<T> set = new PriorityQueue<>(Comparator.reverseOrder());
+        PriorityQueue<T> set = new PriorityQueue<>(_comparator.reversed());
         AtomicReference<Runnable> terminator = new AtomicReference<>(NOOP);
 
         return t -> {
@@ -69,4 +72,6 @@ public class HighestRankingTransformer<T
             };
         };
     }
+
+    private Comparator<? super T> _comparator;
 }

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java?rev=1815579&r1=1815578&r2=1815579&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
 Fri Nov 17 16:00:14 2017
@@ -24,6 +24,7 @@ import org.osgi.framework.BundleContext;
 import org.osgi.framework.Filter;
 import org.osgi.framework.InvalidSyntaxException;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
@@ -61,13 +62,46 @@ public class OSGiImpl<T> implements OSGi
        }
 
        @Override
-       public <S> OSGi<S> transformer(
+       public <S> OSGi<S> transform(
                Function<Function<S, Runnable>, Function<T, Runnable>> fun) {
 
                return new TransformerOSGi<>(this, fun);
        }
 
        @Override
+       public <K, S> OSGi<S> splitBy(
+               Function<T, K> mapper, Function<OSGi<T>, OSGi<S>> fun) {
+
+               return new OSGiImpl<>((bundleContext, op) -> {
+                       HashMap<K, Function<T, Runnable>> pipes = new 
HashMap<>();
+                       HashMap<K, OSGiResult> results = new HashMap<>();
+
+                       return _operation.run(
+                               bundleContext,
+                               t -> {
+                                       K key = mapper.apply(t);
+
+                                       results.computeIfAbsent(key, __ -> {
+                                               ProbeImpl<T> probe = new 
ProbeImpl<>();
+
+                                               OSGiImpl<S> program = 
(OSGiImpl<S>)fun.apply(probe);
+
+                                               OSGiResult r = 
program._operation.run(
+                                                       bundleContext, op);
+
+                                               pipes.put(key, 
probe.getOperation());
+
+                                               r.start();
+
+                                               return r;
+                                       });
+
+                                       return pipes.get(key).apply(t);
+                               });
+               });
+       }
+
+       @Override
        public OSGi<T> recover(BiFunction<T, Exception, T> onError) {
                return new OSGiImpl<>((bundleContext, op) ->
                        _operation.run(

Copied: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java
 (from r1815578, 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java)
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java?p2=aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java&p1=aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java&r1=1815578&r2=1815579&rev=1815579&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java
 Fri Nov 17 16:00:14 2017
@@ -17,12 +17,8 @@
 
 package org.apache.aries.osgi.functional.internal;
 
-import org.apache.aries.osgi.functional.Event;
-import org.apache.aries.osgi.functional.SentEvent;
 import org.osgi.framework.BundleContext;
 
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
 import java.util.function.Function;
 
 /**
@@ -34,42 +30,14 @@ public class ProbeImpl<T> extends OSGiIm
         super(new ProbeOperationImpl<>());
     }
 
-    public Function<T, SentEvent<T>> getOperation() {
-        return (t) -> {
-            ProbeOperationImpl<T> operation = (ProbeOperationImpl<T>) 
_operation;
-
-            AtomicReference<Runnable> terminator = new AtomicReference<>();
-
-            if (!operation.closed) {
-                terminator.set(operation._op.apply(t));
-            }
-
-            SentEvent<T> sentEvent = new SentEvent<T>() {
-                @Override
-                public Event<T> getEvent() {
-                    return () -> t;
-                }
-
-                @Override
-                public void terminate() {
-                    Runnable runnable = terminator.get();
-
-                    if (runnable != null) {
-                        runnable.run();
-                    }
-                }
-            };
-
-
-            return sentEvent;
-        };
+    public Function<T, Runnable> getOperation() {
+        return ((ProbeOperationImpl<T>) _operation)._op;
     }
 
     private static class ProbeOperationImpl<T> implements OSGiOperationImpl<T> 
{
 
         BundleContext _bundleContext;
         Function<T, Runnable> _op;
-        volatile boolean closed;
 
         @Override
         public OSGiResultImpl run(
@@ -77,7 +45,7 @@ public class ProbeImpl<T> extends OSGiIm
             _bundleContext = bundleContext;
             _op = op;
 
-            return new OSGiResultImpl(NOOP, () -> closed = true);
+            return new OSGiResultImpl(NOOP, NOOP);
         }
     }
 

Modified: 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java?rev=1815579&r1=1815578&r2=1815579&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java
 (original)
+++ 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java
 Fri Nov 17 16:00:14 2017
@@ -68,16 +68,16 @@ public class ProbeTests {
 
         program.run(bundleContext, result::set);
 
-        Function<String, SentEvent<String>> opA = probeA.getOperation();
+        Function<String, Runnable> opA = probeA.getOperation();
 
-        SentEvent<String> sentA = opA.apply("Hello");
+        Runnable sentA = opA.apply("Hello");
 
-        Function<String, SentEvent<String>> opB = 
probeBreference.get().getOperation();
+        Function<String, Runnable> opB = probeBreference.get().getOperation();
 
-        sentA.terminate();
+        sentA.run();
 
-        SentEvent<String> sentB = opB.apply(", World");
-        sentB.terminate();
+        Runnable sentB = opB.apply(", World");
+        sentB.run();
 
         assertEquals("", result.get());
 
@@ -91,8 +91,8 @@ public class ProbeTests {
 
         assertEquals("Hello, World", result.get());
 
-        sentA.terminate();
-        sentB.terminate();
+        sentA.run();
+        sentB.run();
 
         assertEquals("", result.get());
     }
@@ -103,7 +103,6 @@ public class ProbeTests {
 
         ProbeImpl<Integer> probeA = new ProbeImpl<>();
 
-        Function<Integer, SentEvent<Integer>> opA = probeA.getOperation();
         OSGi<Integer> just10 = just(10);
 
         OSGi<Integer> program = probeA.flatMap(a ->
@@ -116,19 +115,18 @@ public class ProbeTests {
         program.run(bundleContext, result::set);
         assertEquals(0, result.get());
 
-        SentEvent<Integer> sentA = opA.apply(5);
-        assertEquals(15, result.get());
+        Function<Integer, Runnable> opA = probeA.getOperation();
 
-        sentA.terminate();
-        assertEquals(17, result.get());
+        Runnable sentA = opA.apply(5);
+        assertEquals(15, result.get());
 
-        sentA.terminate();
+        sentA.run();
         assertEquals(17, result.get());
 
         sentA = opA.apply(10);
         assertEquals(20, result.get());
 
-        sentA.terminate();
+        sentA.run();
         assertEquals(22, result.get());
     }
 

Modified: 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/AsynchronousTest.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/AsynchronousTest.java?rev=1815579&r1=1815578&r2=1815579&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/AsynchronousTest.java
 (original)
+++ 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/AsynchronousTest.java
 Fri Nov 17 16:00:14 2017
@@ -337,9 +337,9 @@ public class AsynchronousTest {
 
         result.start();
 
-        Function<Integer, SentEvent<Integer>> opa = ((ProbeImpl<Integer>) 
as).getOperation();
-        Function<Integer, SentEvent<Integer>> opb = ((ProbeImpl<Integer>) 
bs).getOperation();
-        Function<Integer, SentEvent<Integer>> opc = ((ProbeImpl<Integer>) 
cs).getOperation();
+        Function<Integer, Runnable> opa = ((ProbeImpl<Integer>) 
as).getOperation();
+        Function<Integer, Runnable> opb = ((ProbeImpl<Integer>) 
bs).getOperation();
+        Function<Integer, Runnable> opc = ((ProbeImpl<Integer>) 
cs).getOperation();
 
         ExecutorService executor = Executors.newFixedThreadPool(8);
 
@@ -354,29 +354,29 @@ public class AsynchronousTest {
                     executor.execute(() -> {
                         ignoreException(() -> 
Thread.sleep(random.nextInt(10)));
 
-                        SentEvent<Integer> sentEvent = opa.apply(ii);
+                        Runnable sentEvent = opa.apply(ii);
 
                         ignoreException(() -> Thread.sleep(random.nextInt(2)));
 
-                        sentEvent.terminate();
+                        sentEvent.run();
                     });
                     executor.execute(() -> {
                         ignoreException(() -> Thread.sleep(random.nextInt(5)));
 
-                        SentEvent<Integer> sentEvent = opb.apply(jj);
+                        Runnable sentEvent = opb.apply(jj);
 
                         ignoreException(() -> Thread.sleep(random.nextInt(2)));
 
-                        sentEvent.terminate();
+                        sentEvent.run();
                     });
                     executor.execute(() -> {
                         ignoreException(() -> Thread.sleep(random.nextInt(2)));
 
-                        SentEvent<Integer> sentEvent = opc.apply(kk);
+                        Runnable sentEvent = opc.apply(kk);
 
                         ignoreException(() -> Thread.sleep(random.nextInt(2)));
 
-                        sentEvent.terminate();
+                        sentEvent.run();
                     });
                 }
             }

Modified: 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java?rev=1815579&r1=1815578&r2=1815579&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java
 (original)
+++ 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java
 Fri Nov 17 16:00:14 2017
@@ -20,6 +20,7 @@ package org.apache.aries.osgi.functional
 import org.apache.aries.osgi.functional.CachingServiceReference;
 import org.apache.aries.osgi.functional.OSGi;
 import org.apache.aries.osgi.functional.OSGiResult;
+import org.apache.aries.osgi.functional.internal.HighestRankingTransformer;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -52,6 +53,7 @@ import static org.apache.aries.osgi.func
 import static org.apache.aries.osgi.functional.OSGi.register;
 import static org.apache.aries.osgi.functional.OSGi.serviceReferences;
 import static org.apache.aries.osgi.functional.OSGi.services;
+import static org.apache.aries.osgi.functional.Utils.highest;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -335,13 +337,16 @@ public class ComponentTest {
         }
     }
 
-
     private static <T> OSGi<T> highestService(Class<T> clazz) {
-        return 
highest(clazz).map(CachingServiceReference::getServiceReference).flatMap(sr ->
-        bundleContext().flatMap(bc ->
-        onClose(() -> bc.ungetService(sr)).then(
-        just(bc.getService(sr))
-        )));
+        return
+            highest(
+                serviceReferences(clazz)).map(
+                    CachingServiceReference::getServiceReference).
+                flatMap(sr ->
+            bundleContext().flatMap(bc ->
+            onClose(() -> bc.ungetService(sr)).then(
+            just(bc.getService(sr))
+            )));
     }
 
     public static <T> OSGi<Void> dynamic(
@@ -394,13 +399,6 @@ public class ComponentTest {
 
     }
 
-    private static <T> OSGi<CachingServiceReference<T>> highest(
-        Class<T> clazz) {
-
-        return serviceReferences(clazz).transformer(
-            new HighestRankingTransformer<>());
-    }
-
     private class Service {}
 
     private class ServiceOptional {}

Modified: 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java?rev=1815579&r1=1815578&r2=1815579&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
 (original)
+++ 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
 Fri Nov 17 16:00:14 2017
@@ -39,7 +39,6 @@ import java.util.Arrays;
 import java.util.Dictionary;
 import java.util.HashMap;
 import java.util.Hashtable;
-import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -47,7 +46,6 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 
-import static org.apache.aries.osgi.functional.OSGi.NOOP;
 import static org.apache.aries.osgi.functional.OSGi.configuration;
 import static org.apache.aries.osgi.functional.OSGi.configurations;
 import static org.apache.aries.osgi.functional.OSGi.just;
@@ -56,6 +54,7 @@ import static org.apache.aries.osgi.func
 import static org.apache.aries.osgi.functional.OSGi.register;
 import static org.apache.aries.osgi.functional.OSGi.serviceReferences;
 import static org.apache.aries.osgi.functional.OSGi.services;
+import static org.apache.aries.osgi.functional.Utils.highest;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -511,7 +510,7 @@ public class DSLTest {
             new AtomicReference<>();
 
         OSGi<Void> program =
-            highest(Service.class).
+            highest(serviceReferences(Service.class)).
             foreach(current::set, sr -> current.set(null));
 
         assertNull(current.get());
@@ -627,7 +626,6 @@ public class DSLTest {
     public void testOnce() {
         ProbeImpl<Integer> probe = new ProbeImpl<>();
 
-        Function<Integer, SentEvent<Integer>> op = probe.getOperation();
 
         AtomicInteger count = new AtomicInteger();
 
@@ -638,31 +636,33 @@ public class DSLTest {
 
         once.run(bundleContext);
 
+        Function<Integer, Runnable> op = probe.getOperation();
+
         assertEquals(0, count.get());
 
-        SentEvent<Integer> se = op.apply(1);
+        Runnable se = op.apply(1);
 
         assertEquals(1, count.get());
 
-        se.terminate();
+        se.run();
 
         assertEquals(0, count.get());
 
         se = op.apply(1);
-        SentEvent se2 = op.apply(2);
-        SentEvent se3 = op.apply(3);
+        Runnable se2 = op.apply(2);
+        Runnable se3 = op.apply(3);
 
         assertEquals(1, count.get());
 
-        se.terminate();
+        se.run();
 
         assertEquals(1, count.get());
 
-        se3.terminate();
+        se3.run();
 
         assertEquals(1, count.get());
 
-        se2.terminate();
+        se2.run();
 
         assertEquals(0, count.get());
     }
@@ -799,11 +799,6 @@ public class DSLTest {
         }
     }
 
-    private static <T> OSGi<CachingServiceReference<T>> highest(Class<T> 
clazz) {
-        return serviceReferences(clazz).transformer(
-            new HighestRankingTransformer<>());
-    }
-
     private class Service {}
 
 }

Added: 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/UtilTest.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/UtilTest.java?rev=1815579&view=auto
==============================================================================
--- 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/UtilTest.java
 (added)
+++ 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/UtilTest.java
 Fri Nov 17 16:00:14 2017
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aries.osgi.functional.test;
+
+import org.apache.aries.osgi.functional.OSGi;
+import org.apache.aries.osgi.functional.OSGiResult;
+import org.junit.Test;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.FrameworkUtil;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.aries.osgi.functional.OSGi.just;
+import static org.apache.aries.osgi.functional.Utils.accumulate;
+import static org.apache.aries.osgi.functional.Utils.highest;
+
+/**
+ * @author Carlos Sierra Andrés
+ */
+public class UtilTest {
+
+    static BundleContext bundleContext = FrameworkUtil.getBundle(
+        UtilTest.class).getBundleContext();
+
+    @Test
+    public void testHighestsPer() {
+
+        OSGi<List<String>> program = just(Arrays.asList(
+            "apepe", "aana", "bvicente", "bcarlos", "cpepe", "ctomas"
+        )).splitBy(
+            x -> x.substring(0, 1),
+            p -> accumulate(p).effects(
+                t -> System.out.println("Incoming: " + t),
+                t -> System.out.println("Leaving: " + t)
+            )
+        ).effects(
+            t -> System.out.println("Incoming TOTAL: " + t),
+            t -> System.out.println("Leaving TOTAL: " + t)
+        );
+
+        /*OSGi<List<String>> program = republishIf(
+            (l1, l2) -> {
+                System.out.println("Checking: " + l1 + " : " + l2);
+                if (l1 == null) {
+                    return true;
+                }
+                if (l2.isEmpty()) {
+                    return true;
+                }
+                return !l1.subList(0, 1).equals(l2.subList(0, 1));
+            },
+            highestsPer(
+                x -> x.substring(0, 1), Comparator.naturalOrder(),
+                ))).
+            effects(
+                t -> System.out.println("Incoming: " + t),
+                t -> System.out.println("Leaving: " + t)
+            );
+        */
+        OSGiResult result = program.run(bundleContext);
+
+        result.close();
+    }
+
+
+}


Reply via email to