Author: csierra
Date: Fri Aug 25 11:21:38 2017
New Revision: 1806156

URL: http://svn.apache.org/viewvc?rev=1806156&view=rev
Log:
Looks like `all` should combine the results

Modified:
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java?rev=1806156&r1=1806155&r2=1806156&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
 Fri Aug 25 11:21:38 2017
@@ -163,8 +163,9 @@ public interface OSGi<T> extends OSGiRun
                return new ServiceReferenceOSGi<>(filterString, clazz);
        }
 
-       static OSGi<Void> all(OSGi<?> ... programs) {
-               return new DistributeOSGi(programs);
+       @SafeVarargs
+       static <T> OSGi<T> all(OSGi<T> ... programs) {
+               return new DistributeOSGi<>(programs);
        }
 
        OSGi<T> filter(Predicate<T> predicate);

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java?rev=1806156&r1=1806155&r2=1806156&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java
 Fri Aug 25 11:21:38 2017
@@ -29,33 +29,41 @@ import java.util.stream.Collectors;
 /**
  * @author Carlos Sierra Andrés
  */
-public class DistributeOSGi extends OSGiImpl<Void> {
+public class DistributeOSGi<T> extends OSGiImpl<T> {
 
-    public DistributeOSGi(OSGi<?>... programs) {
+    @SafeVarargs
+    public DistributeOSGi(OSGi<T>... programs) {
         super(bundleContext -> {
-            Pipe<Tuple<Void>, Tuple<Void>> added = Pipe.create();
+            Pipe<Tuple<T>, Tuple<T>> added = Pipe.create();
 
-            Consumer<Tuple<Void>> addedSource = added.getSource();
+            Consumer<Tuple<T>> addedSource = added.getSource();
 
-            List<OSGiResult<?>> results = new ArrayList<>();
+            List<OSGiResult<T>> results = new ArrayList<>();
 
-            Pipe<Tuple<Void>, Tuple<Void>> removed = Pipe.create();
+            Pipe<Tuple<T>, Tuple<T>> removed = Pipe.create();
 
-            Consumer<Tuple<Void>> removedSource = removed.getSource();
+            Consumer<Tuple<T>> removedSource = removed.getSource();
 
             return new OSGiResultImpl<>(
                 added, removed,
                 () -> {
                     results.addAll(
                         Arrays.stream(programs).
-                            map(o -> o.run(bundleContext)).
-                            collect(Collectors.toList()));
+                            map(o -> {
+                                OSGiResultImpl<T> osGiResult =
+                                    ((OSGiImpl<T>) o)._operation.run(
+                                        bundleContext);
+
+                                osGiResult.added.map(t -> 
{addedSource.accept(t); return null;});
+                                osGiResult.removed.map(t -> 
{removedSource.accept(t); return null;});
+
+                                osGiResult.start.run();
 
-                    addedSource.accept(Tuple.create(null));
+                                return osGiResult;
+                            }).
+                            collect(Collectors.toList()));
                 },
                 () -> {
-                    removedSource.accept(Tuple.create(null));
-
                     for (OSGiResult<?> result : results) {
                         try {
                             result.close();


Reply via email to