Author: csierra
Date: Fri Nov 17 16:00:25 2017
New Revision: 1815580

URL: http://svn.apache.org/viewvc?rev=1815580&view=rev
Log:
[Component-DSL] Implement _highest_ as a primitive

Getting the highest service reference available is such a common need
that I guess it makes sense to have as a primitive.

Added:
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingOSGi.java
Removed:
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingTransformer.java
Modified:
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Utils.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java
    
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java
    
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java
    
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
    
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/UtilTest.java

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java?rev=1815580&r1=1815579&r2=1815580&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
 Fri Nov 17 16:00:25 2017
@@ -58,7 +58,13 @@ import java.util.function.Supplier;
  */
 public interface OSGi<T> extends OSGiRunnable<T> {
        Runnable NOOP = () -> {};
-       
+
+       <S> OSGi<S> choose(
+               Predicate<T> chooser, Function<OSGi<T>, OSGi<S>> then,
+               Function<OSGi<T>, OSGi<S>> otherwise);
+
+       <S> OSGi<S> distribute(Function<OSGi<T>, OSGi<S>> ... funs);
+
        <K, S> OSGi<S> splitBy(
                Function<T, K> mapper, Function<OSGi<T>, OSGi<S>> fun);
 
@@ -80,7 +86,8 @@ public interface OSGi<T> extends OSGiRun
        OSGi<Void> foreach(
                Consumer<? super T> onAdded, Consumer<? super T> onRemoved);
 
-       <S> OSGi<S> transform(Function<Function<S, Runnable>, Function<T, 
Runnable>> fun);
+       <S> OSGi<S> transform(
+               Function<Function<S, Runnable>, Function<T, Runnable>> fun);
 
        static OSGi<Void> ignore(OSGi<?> program) {
                return new IgnoreImpl(program);

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Utils.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Utils.java?rev=1815580&r1=1815579&r2=1815580&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Utils.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Utils.java
 Fri Nov 17 16:00:25 2017
@@ -1,7 +1,7 @@
 package org.apache.aries.osgi.functional;
 
 import org.apache.aries.osgi.functional.internal.ConcurrentDoublyLinkedList;
-import org.apache.aries.osgi.functional.internal.HighestRankingTransformer;
+import org.apache.aries.osgi.functional.internal.HighestRankingOSGi;
 
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -18,14 +18,19 @@ import static org.apache.aries.osgi.func
 public interface Utils {
 
     static <T extends Comparable<? super T>> OSGi<T> highest(OSGi<T> program) {
-        return program.transform(
-            new HighestRankingTransformer<>(Comparator.naturalOrder()));
+        return highest(program, Comparator.naturalOrder());
     }
 
     static <T> OSGi<T> highest(
-        Comparator<? super T> comparator, OSGi<T> program) {
+        OSGi<T> program, Comparator<? super T> comparator) {
 
-        return program.transform(new HighestRankingTransformer<>(comparator));
+        return highest(program, comparator, __ -> __);
+    }
+
+    static <T> OSGi<T> highest(
+        OSGi<T> program, Comparator<? super T> comparator, Function<OSGi<T>, 
OSGi<T>> notHighest) {
+
+        return new HighestRankingOSGi<>(program, comparator, notHighest);
     }
 
     static <T> OSGi<List<T>> accumulate(OSGi<T> program) {

Added: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingOSGi.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingOSGi.java?rev=1815580&view=auto
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingOSGi.java
 (added)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingOSGi.java
 Fri Nov 17 16:00:25 2017
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aries.osgi.functional.internal;
+
+import org.apache.aries.osgi.functional.OSGi;
+
+import java.util.Comparator;
+import java.util.PriorityQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+/**
+ * @author Carlos Sierra Andrés
+ */
+public class HighestRankingOSGi<T> extends OSGiImpl<T> {
+
+    public HighestRankingOSGi(
+        OSGi<T> previous, Comparator<? super T> comparator,
+        Function<OSGi<T>, OSGi<T>> notHighest) {
+
+        super((bundleContext, highestPipe) -> {
+            Comparator<Tuple<T>> comparing = Comparator.comparing(
+                Tuple::getT, comparator);
+            PriorityQueue<Tuple<T>> set = new PriorityQueue<>(
+                comparing.reversed());
+            AtomicReference<Tuple<T>> sent = new AtomicReference<>();
+
+            Function<T, Runnable> notHighestPipe = ProbeImpl.getProbePipe(
+                notHighest, bundleContext, __ -> () -> {});
+
+            return ((OSGiImpl<T>)previous)._operation.run(
+                bundleContext,
+                t -> {
+                    Tuple<T> tuple = new Tuple<>(t);
+
+                    synchronized (set) {
+                        set.add(tuple);
+
+                        if (set.peek() == tuple) {
+                            Tuple<T> old = sent.get();
+
+                            if (old != null) {
+                                old._runnable.run();
+
+                                old._runnable = notHighestPipe.apply(old._t);
+                            }
+
+                            tuple._runnable = highestPipe.apply(t);
+
+                            sent.set(tuple);
+                        }
+                        else {
+                            tuple._runnable = notHighestPipe.apply(t);
+                        }
+                    }
+
+                    return () -> {
+                        synchronized (set) {
+                            Tuple<T> old = set.peek();
+
+                            set.remove(tuple);
+
+                            Tuple<T> current = set.peek();
+
+                            tuple._runnable.run();
+
+                            if (current != old && current != null) {
+                                current._runnable.run();
+                                current._runnable = highestPipe.apply(
+                                    current._t);
+                                sent.set(current);
+                            }
+                            if (current == null) {
+                                sent.set(null);
+                            }
+                        }
+                    };
+                });
+        });
+    }
+
+    private static class Tuple<T> {
+
+        T _t;
+        Runnable _runnable;
+
+        Tuple(T t) {
+            _t = t;
+        }
+
+        public T getT() {
+            return _t;
+        }
+
+    }
+
+}

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java?rev=1815580&r1=1815579&r2=1815580&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
 Fri Nov 17 16:00:25 2017
@@ -19,11 +19,13 @@ package org.apache.aries.osgi.functional
 
 import org.apache.aries.osgi.functional.OSGi;
 import org.apache.aries.osgi.functional.OSGiResult;
+import org.apache.aries.osgi.functional.Utils;
 import 
org.apache.aries.osgi.functional.internal.ConcurrentDoublyLinkedList.Node;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.Filter;
 import org.osgi.framework.InvalidSyntaxException;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
@@ -69,6 +71,56 @@ public class OSGiImpl<T> implements OSGi
        }
 
        @Override
+       public <S> OSGi<S> choose(
+               Predicate<T> chooser, Function<OSGi<T>, OSGi<S>> then,
+               Function<OSGi<T>, OSGi<S>> otherwise) {
+
+               return new OSGiImpl<>((bundleContext, publisher) -> {
+                       Function<T, Runnable> thenPipe = 
ProbeImpl.getProbePipe(then, bundleContext, publisher);
+
+                       Function<T, Runnable> elsePipe = 
ProbeImpl.getProbePipe(otherwise, bundleContext, publisher);
+
+                       return _operation.run(
+                               bundleContext,
+                               t -> {
+                                       if (chooser.test(t)) {
+                                               return thenPipe.apply(t);
+                                       }
+                                       else {
+                                               return elsePipe.apply(t);
+                                       }
+                               });
+               });
+       }
+
+       @Override
+       @SafeVarargs
+       public final <S> OSGi<S> distribute(Function<OSGi<T>, OSGi<S>> ... 
funs) {
+               return new OSGiImpl<>((bundleContext, publisher) -> {
+                       List<Function<T, Runnable>> pipes =
+                               Arrays.stream(
+                                       funs
+                               ).map(
+                                       fun -> ProbeImpl.getProbePipe(fun, 
bundleContext, publisher)
+                               ).collect(
+                                       Collectors.toList()
+                       );
+
+                       return _operation.run(
+                               bundleContext,
+                               t -> {
+                                       List<Runnable> terminators =
+                                               pipes.stream().map(p -> 
p.apply(t)).collect(
+                                                       Collectors.toList());
+
+                                       return () -> {
+                                               
terminators.forEach(Runnable::run);
+                                       };
+                               });
+               });
+       }
+
+       @Override
        public <K, S> OSGi<S> splitBy(
                Function<T, K> mapper, Function<OSGi<T>, OSGi<S>> fun) {
 
@@ -89,10 +141,10 @@ public class OSGiImpl<T> implements OSGi
                                                OSGiResult r = 
program._operation.run(
                                                        bundleContext, op);
 
-                                               pipes.put(key, 
probe.getOperation());
-
                                                r.start();
 
+                                               pipes.put(key, 
probe.getOperation());
+
                                                return r;
                                        });
 

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java?rev=1815580&r1=1815579&r2=1815580&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java
 Fri Nov 17 16:00:25 2017
@@ -17,6 +17,8 @@
 
 package org.apache.aries.osgi.functional.internal;
 
+import org.apache.aries.osgi.functional.OSGi;
+import org.apache.aries.osgi.functional.OSGiResult;
 import org.osgi.framework.BundleContext;
 
 import java.util.function.Function;
@@ -34,6 +36,24 @@ public class ProbeImpl<T> extends OSGiIm
         return ((ProbeOperationImpl<T>) _operation)._op;
     }
 
+    public static <T, S> Function<T, Runnable> getProbePipe(
+        Function<OSGi<T>, OSGi<S>> then, BundleContext bundleContext,
+        Function<S, Runnable> publisher) {
+
+        ProbeImpl<T> thenProbe = new ProbeImpl<>();
+
+        OSGiImpl<S> thenNext = (OSGiImpl<S>) then.apply(thenProbe);
+
+        OSGiResult thenResult = thenNext._operation.run(
+            bundleContext, publisher);
+
+        Function<T, Runnable> thenPipe = thenProbe.getOperation();
+
+        thenResult.start();
+
+        return thenPipe;
+    }
+
     private static class ProbeOperationImpl<T> implements OSGiOperationImpl<T> 
{
 
         BundleContext _bundleContext;

Modified: 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java?rev=1815580&r1=1815579&r2=1815580&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java
 (original)
+++ 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java
 Fri Nov 17 16:00:25 2017
@@ -20,6 +20,7 @@ package org.apache.aries.osgi.functional
 import org.apache.aries.osgi.functional.OSGi;
 import org.apache.aries.osgi.functional.SentEvent;
 import org.apache.aries.osgi.functional.test.DSLTest;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.FrameworkUtil;
@@ -36,6 +37,7 @@ import static org.junit.Assert.assertEqu
 /**
  * @author Carlos Sierra Andrés
  */
+@Ignore
 public class ProbeTests {
 
     static BundleContext bundleContext =

Modified: 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java?rev=1815580&r1=1815579&r2=1815580&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java
 (original)
+++ 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java
 Fri Nov 17 16:00:25 2017
@@ -20,7 +20,6 @@ package org.apache.aries.osgi.functional
 import org.apache.aries.osgi.functional.CachingServiceReference;
 import org.apache.aries.osgi.functional.OSGi;
 import org.apache.aries.osgi.functional.OSGiResult;
-import org.apache.aries.osgi.functional.internal.HighestRankingTransformer;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;

Modified: 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java?rev=1815580&r1=1815579&r2=1815580&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
 (original)
+++ 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
 Fri Nov 17 16:00:25 2017
@@ -20,7 +20,6 @@ package org.apache.aries.osgi.functional
 import org.apache.aries.osgi.functional.CachingServiceReference;
 import org.apache.aries.osgi.functional.OSGi;
 import org.apache.aries.osgi.functional.OSGiResult;
-import org.apache.aries.osgi.functional.SentEvent;
 import org.apache.aries.osgi.functional.internal.ProbeImpl;
 import org.junit.Test;
 import org.osgi.framework.BundleContext;
@@ -36,6 +35,8 @@ import org.osgi.service.cm.ManagedServic
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.Dictionary;
 import java.util.HashMap;
 import java.util.Hashtable;
@@ -49,6 +50,7 @@ import java.util.function.Function;
 import static org.apache.aries.osgi.functional.OSGi.configuration;
 import static org.apache.aries.osgi.functional.OSGi.configurations;
 import static org.apache.aries.osgi.functional.OSGi.just;
+import static org.apache.aries.osgi.functional.OSGi.nothing;
 import static org.apache.aries.osgi.functional.OSGi.onClose;
 import static org.apache.aries.osgi.functional.OSGi.once;
 import static org.apache.aries.osgi.functional.OSGi.register;
@@ -572,12 +574,86 @@ public class DSLTest {
                 serviceRegistrationOne.getReference(),
                 current.get().getServiceReference());
 
+            serviceRegistrationOne.unregister();
             serviceRegistrationMinusOne.unregister();
         }
 
     }
 
     @Test
+    public void testHighestRankingDiscards() {
+        ArrayList<ServiceReference<?>> discards = new ArrayList<>();
+
+        OSGi<CachingServiceReference<Service>> program = 
highest(serviceReferences(Service.class),
+            Comparator.naturalOrder(),
+            dp ->
+                dp.map(CachingServiceReference::getServiceReference).effects(
+                    discards::add, discards::remove).then(nothing()));
+
+        assertTrue(discards.isEmpty());
+
+        try (OSGiResult result = program.run(bundleContext)) {
+            ServiceRegistration<Service> serviceRegistrationOne =
+                bundleContext.registerService(
+                    Service.class, new Service(),
+                    new Hashtable<String, Object>() {{
+                        put("service.ranking", 0);
+                    }});
+
+            assertEquals(Collections.emptyList(), discards);
+
+            ServiceRegistration<Service> serviceRegistrationTwo =
+                bundleContext.registerService(
+                    Service.class, new Service(),
+                    new Hashtable<String, Object>() {{
+                        put("service.ranking", 1);
+                    }});
+
+            assertEquals(
+                Collections.singletonList(
+                    serviceRegistrationOne.getReference()),
+                discards);
+
+            ServiceRegistration<Service> serviceRegistrationMinusOne =
+                bundleContext.registerService(
+                    Service.class, new Service(),
+                    new Hashtable<String, Object>() {{
+                        put("service.ranking", -1);
+                    }});
+
+            assertEquals(
+                Arrays.asList(
+                    serviceRegistrationOne.getReference(),
+                    serviceRegistrationMinusOne.getReference()),
+                discards);
+
+            serviceRegistrationTwo.unregister();
+
+            assertEquals(
+                Arrays.asList(serviceRegistrationMinusOne.getReference()),
+                discards);
+
+            serviceRegistrationOne.unregister();
+
+            assertTrue(discards.isEmpty());
+
+            serviceRegistrationOne =
+                bundleContext.registerService(
+                    Service.class, new Service(),
+                    new Hashtable<String, Object>() {{
+                        put("service.ranking", 0);
+                    }});
+
+            assertEquals(
+                Arrays.asList(serviceRegistrationMinusOne.getReference()),
+                discards);
+
+            serviceRegistrationMinusOne.unregister();
+            serviceRegistrationOne.unregister();
+        }
+    }
+
+    @Test
     public void testApplicativeApplyTo() {
         AtomicInteger integer = new AtomicInteger(0);
 

Modified: 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/UtilTest.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/UtilTest.java?rev=1815580&r1=1815579&r2=1815580&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/UtilTest.java
 (original)
+++ 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/UtilTest.java
 Fri Nov 17 16:00:25 2017
@@ -24,9 +24,11 @@ import org.osgi.framework.BundleContext;
 import org.osgi.framework.FrameworkUtil;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import static org.apache.aries.osgi.functional.OSGi.just;
+import static org.apache.aries.osgi.functional.OSGi.serviceReferences;
 import static org.apache.aries.osgi.functional.Utils.accumulate;
 import static org.apache.aries.osgi.functional.Utils.highest;
 
@@ -78,5 +80,38 @@ public class UtilTest {
         result.close();
     }
 
+    @Test
+    public void testDistribute() {
+        OSGi<List<String>> program = accumulate(just(Arrays.asList(
+            "apepe", "aana", "bvicente", "bcarlos", "cpepe", "ctomas"
+        ))).distribute(
+            pl -> pl.flatMap(l -> {
+                if (l.isEmpty()) {
+                    return just(Collections::<String>emptyList);
+                } else {
+                    return just(() -> l.subList(0, 1));
+                }
+            }).effects(
+                t -> System.out.println("in head:" + t),
+                t -> System.out.println("out head:" + t)
+            ),
+            pl -> pl.flatMap(l -> {
+                if (l.isEmpty()) {
+                    return just(Collections::<String>emptyList);
+                } else {
+                    return just(() -> l.subList(1, l.size()));
+                }
+            }).effects(
+                t -> System.out.println("in tail:" + t),
+                t -> System.out.println("out tail:" + t)
+            )
+        );
 
+        OSGiResult result = program.run(bundleContext);
+
+        result.close();
+    }
+
+    private class Service {
+    }
 }


Reply via email to