Author: csierra
Date: Fri Nov 17 16:00:25 2017
New Revision: 1815580
URL: http://svn.apache.org/viewvc?rev=1815580&view=rev
Log:
[Component-DSL] Implement _highest_ as a primitive
Getting the highest service reference available is such a common need
that I guess it makes sense to have as a primitive.
Added:
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingOSGi.java
Removed:
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingTransformer.java
Modified:
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Utils.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/UtilTest.java
Modified:
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
URL:
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java?rev=1815580&r1=1815579&r2=1815580&view=diff
==============================================================================
---
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
(original)
+++
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
Fri Nov 17 16:00:25 2017
@@ -58,7 +58,13 @@ import java.util.function.Supplier;
*/
public interface OSGi<T> extends OSGiRunnable<T> {
Runnable NOOP = () -> {};
-
+
+ <S> OSGi<S> choose(
+ Predicate<T> chooser, Function<OSGi<T>, OSGi<S>> then,
+ Function<OSGi<T>, OSGi<S>> otherwise);
+
+ <S> OSGi<S> distribute(Function<OSGi<T>, OSGi<S>> ... funs);
+
<K, S> OSGi<S> splitBy(
Function<T, K> mapper, Function<OSGi<T>, OSGi<S>> fun);
@@ -80,7 +86,8 @@ public interface OSGi<T> extends OSGiRun
OSGi<Void> foreach(
Consumer<? super T> onAdded, Consumer<? super T> onRemoved);
- <S> OSGi<S> transform(Function<Function<S, Runnable>, Function<T,
Runnable>> fun);
+ <S> OSGi<S> transform(
+ Function<Function<S, Runnable>, Function<T, Runnable>> fun);
static OSGi<Void> ignore(OSGi<?> program) {
return new IgnoreImpl(program);
Modified:
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Utils.java
URL:
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Utils.java?rev=1815580&r1=1815579&r2=1815580&view=diff
==============================================================================
---
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Utils.java
(original)
+++
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Utils.java
Fri Nov 17 16:00:25 2017
@@ -1,7 +1,7 @@
package org.apache.aries.osgi.functional;
import org.apache.aries.osgi.functional.internal.ConcurrentDoublyLinkedList;
-import org.apache.aries.osgi.functional.internal.HighestRankingTransformer;
+import org.apache.aries.osgi.functional.internal.HighestRankingOSGi;
import java.util.ArrayList;
import java.util.Comparator;
@@ -18,14 +18,19 @@ import static org.apache.aries.osgi.func
public interface Utils {
static <T extends Comparable<? super T>> OSGi<T> highest(OSGi<T> program) {
- return program.transform(
- new HighestRankingTransformer<>(Comparator.naturalOrder()));
+ return highest(program, Comparator.naturalOrder());
}
static <T> OSGi<T> highest(
- Comparator<? super T> comparator, OSGi<T> program) {
+ OSGi<T> program, Comparator<? super T> comparator) {
- return program.transform(new HighestRankingTransformer<>(comparator));
+ return highest(program, comparator, __ -> __);
+ }
+
+ static <T> OSGi<T> highest(
+ OSGi<T> program, Comparator<? super T> comparator, Function<OSGi<T>,
OSGi<T>> notHighest) {
+
+ return new HighestRankingOSGi<>(program, comparator, notHighest);
}
static <T> OSGi<List<T>> accumulate(OSGi<T> program) {
Added:
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingOSGi.java
URL:
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingOSGi.java?rev=1815580&view=auto
==============================================================================
---
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingOSGi.java
(added)
+++
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingOSGi.java
Fri Nov 17 16:00:25 2017
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aries.osgi.functional.internal;
+
+import org.apache.aries.osgi.functional.OSGi;
+
+import java.util.Comparator;
+import java.util.PriorityQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+/**
+ * @author Carlos Sierra Andrés
+ */
+public class HighestRankingOSGi<T> extends OSGiImpl<T> {
+
+ public HighestRankingOSGi(
+ OSGi<T> previous, Comparator<? super T> comparator,
+ Function<OSGi<T>, OSGi<T>> notHighest) {
+
+ super((bundleContext, highestPipe) -> {
+ Comparator<Tuple<T>> comparing = Comparator.comparing(
+ Tuple::getT, comparator);
+ PriorityQueue<Tuple<T>> set = new PriorityQueue<>(
+ comparing.reversed());
+ AtomicReference<Tuple<T>> sent = new AtomicReference<>();
+
+ Function<T, Runnable> notHighestPipe = ProbeImpl.getProbePipe(
+ notHighest, bundleContext, __ -> () -> {});
+
+ return ((OSGiImpl<T>)previous)._operation.run(
+ bundleContext,
+ t -> {
+ Tuple<T> tuple = new Tuple<>(t);
+
+ synchronized (set) {
+ set.add(tuple);
+
+ if (set.peek() == tuple) {
+ Tuple<T> old = sent.get();
+
+ if (old != null) {
+ old._runnable.run();
+
+ old._runnable = notHighestPipe.apply(old._t);
+ }
+
+ tuple._runnable = highestPipe.apply(t);
+
+ sent.set(tuple);
+ }
+ else {
+ tuple._runnable = notHighestPipe.apply(t);
+ }
+ }
+
+ return () -> {
+ synchronized (set) {
+ Tuple<T> old = set.peek();
+
+ set.remove(tuple);
+
+ Tuple<T> current = set.peek();
+
+ tuple._runnable.run();
+
+ if (current != old && current != null) {
+ current._runnable.run();
+ current._runnable = highestPipe.apply(
+ current._t);
+ sent.set(current);
+ }
+ if (current == null) {
+ sent.set(null);
+ }
+ }
+ };
+ });
+ });
+ }
+
+ private static class Tuple<T> {
+
+ T _t;
+ Runnable _runnable;
+
+ Tuple(T t) {
+ _t = t;
+ }
+
+ public T getT() {
+ return _t;
+ }
+
+ }
+
+}
Modified:
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
URL:
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java?rev=1815580&r1=1815579&r2=1815580&view=diff
==============================================================================
---
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
(original)
+++
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
Fri Nov 17 16:00:25 2017
@@ -19,11 +19,13 @@ package org.apache.aries.osgi.functional
import org.apache.aries.osgi.functional.OSGi;
import org.apache.aries.osgi.functional.OSGiResult;
+import org.apache.aries.osgi.functional.Utils;
import
org.apache.aries.osgi.functional.internal.ConcurrentDoublyLinkedList.Node;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Filter;
import org.osgi.framework.InvalidSyntaxException;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
@@ -69,6 +71,56 @@ public class OSGiImpl<T> implements OSGi
}
@Override
+ public <S> OSGi<S> choose(
+ Predicate<T> chooser, Function<OSGi<T>, OSGi<S>> then,
+ Function<OSGi<T>, OSGi<S>> otherwise) {
+
+ return new OSGiImpl<>((bundleContext, publisher) -> {
+ Function<T, Runnable> thenPipe =
ProbeImpl.getProbePipe(then, bundleContext, publisher);
+
+ Function<T, Runnable> elsePipe =
ProbeImpl.getProbePipe(otherwise, bundleContext, publisher);
+
+ return _operation.run(
+ bundleContext,
+ t -> {
+ if (chooser.test(t)) {
+ return thenPipe.apply(t);
+ }
+ else {
+ return elsePipe.apply(t);
+ }
+ });
+ });
+ }
+
+ @Override
+ @SafeVarargs
+ public final <S> OSGi<S> distribute(Function<OSGi<T>, OSGi<S>> ...
funs) {
+ return new OSGiImpl<>((bundleContext, publisher) -> {
+ List<Function<T, Runnable>> pipes =
+ Arrays.stream(
+ funs
+ ).map(
+ fun -> ProbeImpl.getProbePipe(fun,
bundleContext, publisher)
+ ).collect(
+ Collectors.toList()
+ );
+
+ return _operation.run(
+ bundleContext,
+ t -> {
+ List<Runnable> terminators =
+ pipes.stream().map(p ->
p.apply(t)).collect(
+ Collectors.toList());
+
+ return () -> {
+
terminators.forEach(Runnable::run);
+ };
+ });
+ });
+ }
+
+ @Override
public <K, S> OSGi<S> splitBy(
Function<T, K> mapper, Function<OSGi<T>, OSGi<S>> fun) {
@@ -89,10 +141,10 @@ public class OSGiImpl<T> implements OSGi
OSGiResult r =
program._operation.run(
bundleContext, op);
- pipes.put(key,
probe.getOperation());
-
r.start();
+ pipes.put(key,
probe.getOperation());
+
return r;
});
Modified:
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java
URL:
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java?rev=1815580&r1=1815579&r2=1815580&view=diff
==============================================================================
---
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java
(original)
+++
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java
Fri Nov 17 16:00:25 2017
@@ -17,6 +17,8 @@
package org.apache.aries.osgi.functional.internal;
+import org.apache.aries.osgi.functional.OSGi;
+import org.apache.aries.osgi.functional.OSGiResult;
import org.osgi.framework.BundleContext;
import java.util.function.Function;
@@ -34,6 +36,24 @@ public class ProbeImpl<T> extends OSGiIm
return ((ProbeOperationImpl<T>) _operation)._op;
}
+ public static <T, S> Function<T, Runnable> getProbePipe(
+ Function<OSGi<T>, OSGi<S>> then, BundleContext bundleContext,
+ Function<S, Runnable> publisher) {
+
+ ProbeImpl<T> thenProbe = new ProbeImpl<>();
+
+ OSGiImpl<S> thenNext = (OSGiImpl<S>) then.apply(thenProbe);
+
+ OSGiResult thenResult = thenNext._operation.run(
+ bundleContext, publisher);
+
+ Function<T, Runnable> thenPipe = thenProbe.getOperation();
+
+ thenResult.start();
+
+ return thenPipe;
+ }
+
private static class ProbeOperationImpl<T> implements OSGiOperationImpl<T>
{
BundleContext _bundleContext;
Modified:
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java
URL:
http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java?rev=1815580&r1=1815579&r2=1815580&view=diff
==============================================================================
---
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java
(original)
+++
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java
Fri Nov 17 16:00:25 2017
@@ -20,6 +20,7 @@ package org.apache.aries.osgi.functional
import org.apache.aries.osgi.functional.OSGi;
import org.apache.aries.osgi.functional.SentEvent;
import org.apache.aries.osgi.functional.test.DSLTest;
+import org.junit.Ignore;
import org.junit.Test;
import org.osgi.framework.BundleContext;
import org.osgi.framework.FrameworkUtil;
@@ -36,6 +37,7 @@ import static org.junit.Assert.assertEqu
/**
* @author Carlos Sierra Andrés
*/
+@Ignore
public class ProbeTests {
static BundleContext bundleContext =
Modified:
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java
URL:
http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java?rev=1815580&r1=1815579&r2=1815580&view=diff
==============================================================================
---
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java
(original)
+++
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java
Fri Nov 17 16:00:25 2017
@@ -20,7 +20,6 @@ package org.apache.aries.osgi.functional
import org.apache.aries.osgi.functional.CachingServiceReference;
import org.apache.aries.osgi.functional.OSGi;
import org.apache.aries.osgi.functional.OSGiResult;
-import org.apache.aries.osgi.functional.internal.HighestRankingTransformer;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
Modified:
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
URL:
http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java?rev=1815580&r1=1815579&r2=1815580&view=diff
==============================================================================
---
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
(original)
+++
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
Fri Nov 17 16:00:25 2017
@@ -20,7 +20,6 @@ package org.apache.aries.osgi.functional
import org.apache.aries.osgi.functional.CachingServiceReference;
import org.apache.aries.osgi.functional.OSGi;
import org.apache.aries.osgi.functional.OSGiResult;
-import org.apache.aries.osgi.functional.SentEvent;
import org.apache.aries.osgi.functional.internal.ProbeImpl;
import org.junit.Test;
import org.osgi.framework.BundleContext;
@@ -36,6 +35,8 @@ import org.osgi.service.cm.ManagedServic
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.Hashtable;
@@ -49,6 +50,7 @@ import java.util.function.Function;
import static org.apache.aries.osgi.functional.OSGi.configuration;
import static org.apache.aries.osgi.functional.OSGi.configurations;
import static org.apache.aries.osgi.functional.OSGi.just;
+import static org.apache.aries.osgi.functional.OSGi.nothing;
import static org.apache.aries.osgi.functional.OSGi.onClose;
import static org.apache.aries.osgi.functional.OSGi.once;
import static org.apache.aries.osgi.functional.OSGi.register;
@@ -572,12 +574,86 @@ public class DSLTest {
serviceRegistrationOne.getReference(),
current.get().getServiceReference());
+ serviceRegistrationOne.unregister();
serviceRegistrationMinusOne.unregister();
}
}
@Test
+ public void testHighestRankingDiscards() {
+ ArrayList<ServiceReference<?>> discards = new ArrayList<>();
+
+ OSGi<CachingServiceReference<Service>> program =
highest(serviceReferences(Service.class),
+ Comparator.naturalOrder(),
+ dp ->
+ dp.map(CachingServiceReference::getServiceReference).effects(
+ discards::add, discards::remove).then(nothing()));
+
+ assertTrue(discards.isEmpty());
+
+ try (OSGiResult result = program.run(bundleContext)) {
+ ServiceRegistration<Service> serviceRegistrationOne =
+ bundleContext.registerService(
+ Service.class, new Service(),
+ new Hashtable<String, Object>() {{
+ put("service.ranking", 0);
+ }});
+
+ assertEquals(Collections.emptyList(), discards);
+
+ ServiceRegistration<Service> serviceRegistrationTwo =
+ bundleContext.registerService(
+ Service.class, new Service(),
+ new Hashtable<String, Object>() {{
+ put("service.ranking", 1);
+ }});
+
+ assertEquals(
+ Collections.singletonList(
+ serviceRegistrationOne.getReference()),
+ discards);
+
+ ServiceRegistration<Service> serviceRegistrationMinusOne =
+ bundleContext.registerService(
+ Service.class, new Service(),
+ new Hashtable<String, Object>() {{
+ put("service.ranking", -1);
+ }});
+
+ assertEquals(
+ Arrays.asList(
+ serviceRegistrationOne.getReference(),
+ serviceRegistrationMinusOne.getReference()),
+ discards);
+
+ serviceRegistrationTwo.unregister();
+
+ assertEquals(
+ Arrays.asList(serviceRegistrationMinusOne.getReference()),
+ discards);
+
+ serviceRegistrationOne.unregister();
+
+ assertTrue(discards.isEmpty());
+
+ serviceRegistrationOne =
+ bundleContext.registerService(
+ Service.class, new Service(),
+ new Hashtable<String, Object>() {{
+ put("service.ranking", 0);
+ }});
+
+ assertEquals(
+ Arrays.asList(serviceRegistrationMinusOne.getReference()),
+ discards);
+
+ serviceRegistrationMinusOne.unregister();
+ serviceRegistrationOne.unregister();
+ }
+ }
+
+ @Test
public void testApplicativeApplyTo() {
AtomicInteger integer = new AtomicInteger(0);
Modified:
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/UtilTest.java
URL:
http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/UtilTest.java?rev=1815580&r1=1815579&r2=1815580&view=diff
==============================================================================
---
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/UtilTest.java
(original)
+++
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/UtilTest.java
Fri Nov 17 16:00:25 2017
@@ -24,9 +24,11 @@ import org.osgi.framework.BundleContext;
import org.osgi.framework.FrameworkUtil;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import static org.apache.aries.osgi.functional.OSGi.just;
+import static org.apache.aries.osgi.functional.OSGi.serviceReferences;
import static org.apache.aries.osgi.functional.Utils.accumulate;
import static org.apache.aries.osgi.functional.Utils.highest;
@@ -78,5 +80,38 @@ public class UtilTest {
result.close();
}
+ @Test
+ public void testDistribute() {
+ OSGi<List<String>> program = accumulate(just(Arrays.asList(
+ "apepe", "aana", "bvicente", "bcarlos", "cpepe", "ctomas"
+ ))).distribute(
+ pl -> pl.flatMap(l -> {
+ if (l.isEmpty()) {
+ return just(Collections::<String>emptyList);
+ } else {
+ return just(() -> l.subList(0, 1));
+ }
+ }).effects(
+ t -> System.out.println("in head:" + t),
+ t -> System.out.println("out head:" + t)
+ ),
+ pl -> pl.flatMap(l -> {
+ if (l.isEmpty()) {
+ return just(Collections::<String>emptyList);
+ } else {
+ return just(() -> l.subList(1, l.size()));
+ }
+ }).effects(
+ t -> System.out.println("in tail:" + t),
+ t -> System.out.println("out tail:" + t)
+ )
+ );
+ OSGiResult result = program.run(bundleContext);
+
+ result.close();
+ }
+
+ private class Service {
+ }
}