Author: csierra
Date: Mon Nov 28 16:42:38 2016
New Revision: 1771765

URL: http://svn.apache.org/viewvc?rev=1771765&view=rev
Log:
Classes refactoring and proper type bounds

Added:
    
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java
    
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/PrototypesOSGi.java
    
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServicesOSGi.java
Removed:
    
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/MOSGi.java
    
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleMOSGi.java
    
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/MOSGiImpl.java
    
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/PrototypesMOSGi.java
    
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServicesMOSGi.java
Modified:
    
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
    
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleContextOSGiImpl.java
    
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/JustOSGiImpl.java
    
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
    
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pipe.java
    
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tracked.java
    
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java

Modified: 
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java?rev=1771765&r1=1771764&r2=1771765&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
 (original)
+++ 
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
 Mon Nov 28 16:42:38 2016
@@ -19,17 +19,17 @@
 package org.apache.aries.osgi.functional;
 
 import org.apache.aries.osgi.functional.internal.BundleContextOSGiImpl;
-import org.apache.aries.osgi.functional.internal.BundleMOSGi;
+import org.apache.aries.osgi.functional.internal.BundleOSGi;
 import org.apache.aries.osgi.functional.internal.ChangeContextOSGiImpl;
 import org.apache.aries.osgi.functional.internal.ConfigurationOSGiImpl;
 import org.apache.aries.osgi.functional.internal.ConfigurationsOSGiImpl;
 import org.apache.aries.osgi.functional.internal.JustOSGiImpl;
 import org.apache.aries.osgi.functional.internal.NothingOSGiImpl;
 import org.apache.aries.osgi.functional.internal.OnCloseOSGiImpl;
-import org.apache.aries.osgi.functional.internal.PrototypesMOSGi;
+import org.apache.aries.osgi.functional.internal.PrototypesOSGi;
 import org.apache.aries.osgi.functional.internal.ServiceReferenceOSGi;
 import org.apache.aries.osgi.functional.internal.ServiceRegistrationOSGiImpl;
-import org.apache.aries.osgi.functional.internal.ServicesMOSGi;
+import org.apache.aries.osgi.functional.internal.ServicesOSGi;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceObjects;
@@ -38,7 +38,9 @@ import org.osgi.framework.ServiceRegistr
 
 import java.util.Dictionary;
 import java.util.Map;
+import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.Predicate;
 
 /**
  * @author Carlos Sierra Andrés
@@ -46,21 +48,21 @@ import java.util.function.Function;
 public interface OSGi<T> extends OSGiRunnable<T> {
        Runnable NOOP = () -> {};
 
-       <S> OSGi<S> map(Function<T, S> function);
+       <S> OSGi<S> map(Function<? super T, ? extends S> function);
 
-       <S> OSGi<S> flatMap(Function<T, OSGi<S>> fun);
+       <S> OSGi<S> flatMap(Function<? super T, OSGi<? extends S>> fun);
 
        <S> OSGi<S> then(OSGi<S> next);
 
-       <S> OSGi<Void> foreach(Function<T, OSGi<S>> fun);
+       OSGi<Void> foreach(Consumer<? super T> action);
 
        static OSGi<BundleContext> bundleContext() {
 
                return new BundleContextOSGiImpl();
        }
 
-       static MOSGi<Bundle> bundles(int stateMask) {
-               return new BundleMOSGi(stateMask);
+       static OSGi<Bundle> bundles(int stateMask) {
+               return new BundleOSGi(stateMask);
        }
 
        static <T> OSGi<T> changeContext(
@@ -77,6 +79,10 @@ public interface OSGi<T> extends OSGiRun
                return new ConfigurationsOSGiImpl(factoryPid);
        }
 
+       static <S> OSGi<S> just(S s) {
+               return new JustOSGiImpl<>(s);
+       }
+
        static <S> OSGi<S> nothing() {
                return new NothingOSGiImpl<>();
        }
@@ -85,22 +91,18 @@ public interface OSGi<T> extends OSGiRun
                return new OnCloseOSGiImpl(action);
        }
 
-       static <S> OSGi<S> just(S s) {
-               return new JustOSGiImpl<>(s);
-       }
-
-       static MOSGi<ServiceObjects<Object>> prototypes(String filterString) {
+       static OSGi<ServiceObjects<Object>> prototypes(String filterString) {
                return prototypes(null, filterString);
        }
 
-       static <T> MOSGi<ServiceObjects<T>> prototypes(Class<T> clazz) {
+       static <T> OSGi<ServiceObjects<T>> prototypes(Class<T> clazz) {
                return prototypes(clazz, null);
        }
 
-       static <T> MOSGi<ServiceObjects<T>> prototypes(
+       static <T> OSGi<ServiceObjects<T>> prototypes(
                Class<T> clazz, String filterString) {
 
-               return new PrototypesMOSGi<>(clazz, filterString);
+               return new PrototypesOSGi<>(clazz, filterString);
        }
 
        static <T, S extends T> OSGi<ServiceRegistration<T>> register(
@@ -110,16 +112,16 @@ public interface OSGi<T> extends OSGiRun
                        clazz, service, properties);
        }
 
-       static <T> MOSGi<T> services(Class<T> clazz) {
+       static <T> OSGi<T> services(Class<T> clazz) {
                return services(clazz, null);
        }
 
-       static <T> MOSGi<Object> services(String filterString) {
+       static <T> OSGi<Object> services(String filterString) {
                return services(null, filterString);
        }
 
-       static <T> MOSGi<T> services(Class<T> clazz, String filterString) {
-               return new ServicesMOSGi<>(clazz, filterString);
+       static <T> OSGi<T> services(Class<T> clazz, String filterString) {
+               return new ServicesOSGi<>(clazz, filterString);
        }
 
        static <T> OSGi<ServiceReference<T>> serviceReferences(
@@ -140,4 +142,7 @@ public interface OSGi<T> extends OSGiRun
                return new ServiceReferenceOSGi<>(filterString, clazz);
        }
 
+       OSGi<T> filter(Predicate<T> predicate);
+
+       OSGi<Void> distribute(Function<T, OSGi<?>>... funs);
 }

Modified: 
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleContextOSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleContextOSGiImpl.java?rev=1771765&r1=1771764&r2=1771765&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleContextOSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleContextOSGiImpl.java
 Mon Nov 28 16:42:38 2016
@@ -19,24 +19,13 @@ package org.apache.aries.osgi.functional
 
 import org.osgi.framework.BundleContext;
 
-import java.util.function.Consumer;
-
 /**
  * @author Carlos Sierra Andrés
  */
 public class BundleContextOSGiImpl extends OSGiImpl<BundleContext> {
 
        public BundleContextOSGiImpl() {
-               super(bundleContext -> {
-                       Pipe<Tuple<BundleContext>, Tuple<BundleContext>> added =
-                               Pipe.create();
-
-                       Consumer<Tuple<BundleContext>> addedSource = 
added.getSource();
-
-                       return new OSGiResultImpl<>(
-                               added, Pipe.create(),
-                               () -> 
addedSource.accept(Tuple.create(bundleContext)),
-                               NOOP);
-               });
+               super(bundleContext ->
+                       new 
JustOSGiImpl<>(bundleContext)._operation.run(bundleContext));
        }
 }

Added: 
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java?rev=1771765&view=auto
==============================================================================
--- 
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java
 (added)
+++ 
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java
 Mon Nov 28 16:42:38 2016
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aries.osgi.functional.internal;
+
+import org.apache.aries.osgi.functional.OSGi;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleEvent;
+import org.osgi.util.tracker.BundleTracker;
+import org.osgi.util.tracker.BundleTrackerCustomizer;
+
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * @author Carlos Sierra Andrés
+ */
+public class BundleOSGi extends OSGiImpl<Bundle> {
+
+       private final int _stateMask;
+
+       public BundleOSGi(int stateMask) {
+               super(bundleContext -> {
+                       Pipe<Tuple<Bundle>, Tuple<Bundle>> added = 
Pipe.create();
+
+                       Consumer<Tuple<Bundle>> addedSource = added.getSource();
+
+                       Pipe<Tuple<Bundle>, Tuple<Bundle>> removed = 
Pipe.create();
+
+                       Consumer<Tuple<Bundle>> removedSource = 
removed.getSource();
+
+                       BundleTracker<Tuple<Bundle>> bundleTracker =
+                               new BundleTracker<>(
+                                       bundleContext, stateMask,
+                                       new 
BundleTrackerCustomizer<Tuple<Bundle>>() {
+
+                                               @Override
+                                               public Tuple<Bundle> 
addingBundle(
+                                                       Bundle bundle, 
BundleEvent bundleEvent) {
+
+                                                       Tuple<Bundle> tuple = 
Tuple.create(bundle);
+
+                                                       
addedSource.accept(tuple);
+
+                                                       return tuple;
+                                               }
+
+                                               @Override
+                                               public void modifiedBundle(
+                                                       Bundle bundle, 
BundleEvent bundleEvent,
+                                                       Tuple<Bundle> tuple) {
+
+                                                       removedBundle(bundle, 
bundleEvent, tuple);
+
+                                                       addingBundle(bundle, 
bundleEvent);
+                                               }
+
+                                               @Override
+                                               public void removedBundle(
+                                                       Bundle bundle, 
BundleEvent bundleEvent,
+                                                       Tuple<Bundle> tuple) {
+
+                                                       
removedSource.accept(tuple);
+                                               }
+                                       });
+
+                       return new OSGiResultImpl<>(
+                               added, removed, bundleTracker::open, 
bundleTracker::close);
+               });
+               _stateMask = stateMask;
+       }
+
+       @Override
+       public <S> OSGiImpl<S> flatMap(Function<? super Bundle, OSGi<? extends 
S>> fun) {
+               return new OSGiImpl<>(bundleContext -> {
+                       Pipe<Tuple<S>, Tuple<S>> added = Pipe.create();
+
+                       Consumer<Tuple<S>> addedSource = added.getSource();
+
+                       Pipe<Tuple<S>, Tuple<S>> removed = Pipe.create();
+
+                       Consumer<Tuple<S>> removedSource = removed.getSource();
+
+                       BundleTracker<Tracked<Bundle, S>> bundleTracker =
+                               new BundleTracker<>(
+                                       bundleContext, _stateMask,
+                                       new 
BundleTrackerCustomizer<Tracked<Bundle, S>>() {
+
+                                               @Override
+                                               public Tracked<Bundle, S> 
addingBundle(
+                                                       Bundle bundle, 
BundleEvent bundleEvent) {
+
+                                                       OSGiImpl<S> program = 
(OSGiImpl<S>) fun.apply(
+                                                               bundle);
+
+                                                       OSGiResultImpl<S> 
result =
+                                                               
program._operation.run(bundleContext);
+
+                                                       Tracked<Bundle, S> 
tracked = new Tracked<>();
+
+                                                       tracked.service = 
bundle;
+                                                       tracked.program = 
result;
+
+                                                       result.added.map(s -> {
+                                                               tracked.result 
= s;
+
+                                                               
addedSource.accept(s);
+
+                                                               return s;
+                                                       });
+
+                                                       result.start.run();
+
+                                                       return tracked;
+                                               }
+
+                                               @Override
+                                               public void modifiedBundle(
+                                                       Bundle bundle, 
BundleEvent bundleEvent,
+                                                       Tracked<Bundle, S> 
tracked) {
+
+                                                       removedBundle(bundle, 
bundleEvent, tracked);
+
+                                                       addingBundle(bundle, 
bundleEvent);
+                                               }
+
+                                               @Override
+                                               public void removedBundle(
+                                                       Bundle bundle, 
BundleEvent bundleEvent,
+                                                       Tracked<Bundle, S> 
tracked) {
+
+                                                       tracked.program.close();
+
+                                                       if (tracked.result != 
null) {
+                                                               
removedSource.accept(tracked.result);
+                                                       }
+                                               }
+                                       });
+
+                       return new OSGiResultImpl<>(
+                               added, removed, bundleTracker::open, 
bundleTracker::close);
+
+               });
+       }
+
+}

Modified: 
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/JustOSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/JustOSGiImpl.java?rev=1771765&r1=1771764&r2=1771765&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/JustOSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/JustOSGiImpl.java
 Mon Nov 28 16:42:38 2016
@@ -19,24 +19,56 @@
 package org.apache.aries.osgi.functional.internal;
 
 import org.apache.aries.osgi.functional.OSGi;
+import org.apache.aries.osgi.functional.OSGiResult;
 
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 /**
  * @author Carlos Sierra Andrés
  */
-public class JustOSGiImpl<S> extends OSGiImpl<S> {
+public class JustOSGiImpl<T> extends OSGiImpl<T> {
 
-       public JustOSGiImpl(S s) {
+       private T _t;
+
+       public JustOSGiImpl(T t) {
                super(((bundleContext) -> {
 
-                       Pipe<Tuple<S>, Tuple<S>> added = Pipe.create();
+                       Pipe<Tuple<T>, Tuple<T>> added = Pipe.create();
 
-                       Consumer<Tuple<S>> source = added.getSource();
+                       Consumer<Tuple<T>> source = added.getSource();
 
                        return new OSGiResultImpl<>(
                                added, Pipe.create(),
-                               () -> source.accept(Tuple.create(s)), 
OSGi.NOOP);
+                               () -> source.accept(Tuple.create(t)), 
OSGi.NOOP);
                }));
+
+               _t = t;
+       }
+
+       @Override
+       public <S> OSGiImpl<S> flatMap(Function<? super T, OSGi<? extends S>> 
fun) {
+               return new OSGiImpl<>(bundleContext -> {
+                       Pipe<Tuple<S>, Tuple<S>> added = Pipe.create();
+
+                       Consumer<Tuple<S>> addedSource = added.getSource();
+
+                       AtomicReference<OSGiResult<? extends S>> 
atomicReference =
+                               new AtomicReference<>(null);
+
+                       return new OSGiResultImpl<>(
+                               added, Pipe.create(),
+                               () -> {
+                                       OSGi<? extends S> next = fun.apply(_t);
+
+                                       atomicReference.set(
+                                               next.run(
+                                                       bundleContext,
+                                                       s -> 
addedSource.accept(Tuple.create(s))));
+
+                               },
+                               () -> atomicReference.get().close());
+               });
        }
 }

Modified: 
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java?rev=1771765&r1=1771764&r2=1771765&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
 Mon Nov 28 16:42:38 2016
@@ -23,11 +23,16 @@ import org.osgi.framework.BundleContext;
 import org.osgi.framework.Filter;
 import org.osgi.framework.InvalidSyntaxException;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.IdentityHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 /**
  * @author Carlos Sierra Andrés
@@ -41,10 +46,11 @@ public class OSGiImpl<T> implements OSGi
        }
 
        @Override
-       public <S> OSGiImpl<S> flatMap(Function<T, OSGi<S>> fun) {
+       public <S> OSGiImpl<S> flatMap(Function<? super T, OSGi<? extends S>> 
fun) {
                return new OSGiImpl<>(
                        ((bundleContext) -> {
-                               Map<Object, OSGiResult<S>> identities = new 
IdentityHashMap<>();
+                               Map<Object, OSGiResult<? extends S>> identities 
=
+                                       new IdentityHashMap<>();
 
                                AtomicReference<Runnable> closeReference =
                                        new AtomicReference<>(NOOP);
@@ -69,9 +75,9 @@ public class OSGiImpl<T> implements OSGi
                                        closeReference.set(or1.close);
 
                                        or1.added.map(t -> {
-                                               OSGi<S> program = 
fun.apply(t.t);
+                                               OSGi<? extends S> program = 
fun.apply(t.t);
 
-                                               OSGiResult<S> or2 = program.run(
+                                               OSGiResult<? extends S> or2 = 
program.run(
                                                        bundleContext,
                                                        s -> 
addedSource.accept(Tuple.create(s)));
 
@@ -82,8 +88,8 @@ public class OSGiImpl<T> implements OSGi
 
                                        or1.removed.map(t -> {
                                                synchronized (identities) {
-                                                       OSGiResult<S> 
osgiResult1 = identities.remove(
-                                                               t.original);
+                                                       OSGiResult<? extends S> 
osgiResult1 =
+                                                               
identities.remove(t.original);
 
                                                        if (osgiResult1 != 
null) {
                                                                
osgiResult1.close();
@@ -102,12 +108,12 @@ public class OSGiImpl<T> implements OSGi
        }
 
        @Override
-       public <S> OSGi<Void> foreach(Function<T, OSGi<S>> fun) {
-               return this.flatMap(fun).map(x -> null);
+       public OSGi<Void> foreach(Consumer<? super T> consumer) {
+               return this.map(f ->  {consumer.accept(f); return null;});
        }
 
        @Override
-       public <S> OSGi<S> map(Function<T, S> function) {
+       public <S> OSGi<S> map(Function<? super T, ? extends S> function) {
                return new OSGiImpl<>(((bundleContext) -> {
                        OSGiResultImpl<T> osgiResult = 
_operation.run(bundleContext);
 
@@ -190,6 +196,43 @@ public class OSGiImpl<T> implements OSGi
                return stringBuilder.toString();
        }
 
+       @Override
+       public OSGi<T> filter(Predicate<T> predicate) {
+               return flatMap(t -> {
+                       if (predicate.test(t)) {
+                               return OSGi.just(t);
+                       }
+                       else {
+                               return OSGi.nothing();
+                       }
+               });
+       }
+
+       @Override
+       public OSGi<Void> distribute(Function<T, OSGi<?>>... funs) {
+               return new OSGiImpl<>(bundleContext -> {
+                       ArrayList<OSGiResult> results = new ArrayList<>();
+
+                       Pipe<Tuple<Void>, Tuple<Void>> added = Pipe.create();
+
+                       Consumer<Tuple<Void>> addedSource = added.getSource();
+
+                       return new OSGiResultImpl<>(
+                               added, Pipe.create(),
+                               () -> {
+                                       List<OSGiResult> results2 = 
Arrays.stream(funs).
+                                               map(this::flatMap).
+                                               map(o -> o.run(bundleContext)).
+                                               collect(Collectors.toList());
+
+                                       results.addAll(results2);
+
+                                       addedSource.accept(Tuple.create(null));
+                               },
+                               () -> 
results.stream().forEach(OSGiResult::close)
+                       );
+               });
+       }
 }
 
 

Modified: 
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pipe.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pipe.java?rev=1771765&r1=1771764&r2=1771765&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pipe.java
 (original)
+++ 
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pipe.java
 Mon Nov 28 16:42:38 2016
@@ -39,7 +39,7 @@ class Pipe<I, O> {
                return i -> pipe.apply(i);
        }
 
-       <U> Pipe<I, U> map(Function<O, U> fun) {
+       <U> Pipe<I, U> map(Function<? super O, ? extends U> fun) {
                this.pipe = (Function)this.pipe.andThen(fun);
 
                return (Pipe<I, U>)this;

Added: 
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/PrototypesOSGi.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/PrototypesOSGi.java?rev=1771765&view=auto
==============================================================================
--- 
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/PrototypesOSGi.java
 (added)
+++ 
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/PrototypesOSGi.java
 Mon Nov 28 16:42:38 2016
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aries.osgi.functional.internal;
+
+import org.apache.aries.osgi.functional.OSGi;
+import org.apache.aries.osgi.functional.OSGiResult;
+import org.osgi.framework.ServiceObjects;
+import org.osgi.framework.ServiceReference;
+import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.util.tracker.ServiceTrackerCustomizer;
+
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * @author Carlos Sierra Andrés
+ */
+public class PrototypesOSGi<T>
+       extends OSGiImpl<ServiceObjects<T>> {
+
+       private final String _filterString;
+
+       private final Class<T> _clazz;
+
+       public PrototypesOSGi(Class<T> clazz, String filterString) {
+               super(bundleContext -> {
+                       Pipe<Tuple<ServiceObjects<T>>, 
Tuple<ServiceObjects<T>>> added =
+                               Pipe.create();
+
+                       Pipe<Tuple<ServiceObjects<T>>, Tuple<ServiceObjects<T>>>
+                               removed = Pipe.create();
+
+                       Consumer<Tuple<ServiceObjects<T>>> addedSource =
+                               added.getSource();
+
+                       Consumer<Tuple<ServiceObjects<T>>> removedSource =
+                               removed.getSource();
+
+                       ServiceTracker<T, Tuple<ServiceObjects<T>>> 
serviceTracker =
+                               new ServiceTracker<>(
+                                       bundleContext,
+                                       OSGiImpl.buildFilter(
+                                               bundleContext, filterString, 
clazz),
+                                       new ServiceTrackerCustomizer
+                                               <T, Tuple<ServiceObjects<T>>>() 
{
+
+                                               @Override
+                                               public Tuple<ServiceObjects<T>> 
addingService(
+                                                       ServiceReference<T> 
reference) {
+
+                                                       ServiceObjects<T> 
serviceObjects =
+                                                               
bundleContext.getServiceObjects(reference);
+
+                                                       
Tuple<ServiceObjects<T>> tuple =
+                                                               
Tuple.create(serviceObjects);
+
+                                                       
addedSource.accept(tuple);
+
+                                                       return tuple;
+                                               }
+
+                                               @Override
+                                               public void modifiedService(
+                                                       ServiceReference<T> 
reference,
+                                                       
Tuple<ServiceObjects<T>> service) {
+
+                                                       
removedService(reference, service);
+
+                                                       
addingService(reference);
+                                               }
+
+                                               @Override
+                                               public void removedService(
+                                                       ServiceReference<T> 
reference,
+                                                       
Tuple<ServiceObjects<T>> tuple) {
+
+                                                       
removedSource.accept(tuple);
+                                               }
+                                       });
+
+                       return new OSGiResultImpl<>(
+                               added, removed, serviceTracker::open,
+                               serviceTracker::close);
+               });
+
+               _filterString = filterString;
+               _clazz = clazz;
+       }
+
+       @Override
+       public <S> OSGiImpl<S> flatMap(
+               Function<? super ServiceObjects<T>, OSGi<? extends S>> fun) {
+               return new OSGiImpl<>(bundleContext -> {
+                       Pipe<Tuple<S>, Tuple<S>> added = Pipe.create();
+
+                       Pipe<Tuple<S>, Tuple<S>> removed = Pipe.create();
+
+                       Consumer<Tuple<S>> addedSource = added.getSource();
+
+                       Consumer<Tuple<S>> removedSource = removed.getSource();
+
+                       ServiceTracker<T, Tracked<ServiceObjects<T>, S>>
+                               serviceTracker = new ServiceTracker<>(
+                               bundleContext,
+                               buildFilter(bundleContext, _filterString, 
_clazz),
+                               new ServiceTrackerCustomizer
+                                       <T, Tracked<ServiceObjects<T>, S>>() {
+
+                                       @Override
+                                       public Tracked<ServiceObjects<T>, S> 
addingService(
+                                               ServiceReference<T> reference) {
+
+                                               ServiceObjects<T> 
serviceObjects =
+                                                       
bundleContext.getServiceObjects(
+                                                               reference);
+
+                                               OSGi<? extends S> program = 
fun.apply(serviceObjects);
+
+                                               Tracked<ServiceObjects<T>, S> 
tracked =
+                                                       new Tracked<>();
+
+                                               OSGiResult<? extends S> result 
= program.run(
+                                                       bundleContext, s -> {
+                                                               Tuple<S> tuple 
= Tuple.create(s);
+
+                                                               tracked.result 
= tuple;
+
+                                                               
addedSource.accept(tuple);
+                                                       }
+                                               );
+
+                                               tracked.program = result;
+                                               tracked.service = 
serviceObjects;
+
+                                               return tracked;
+                                       }
+
+                                       @Override
+                                       public void modifiedService(
+                                               ServiceReference<T> reference,
+                                               Tracked<ServiceObjects<T>, S> 
tracked) {
+
+                                               removedService(reference, 
tracked);
+
+                                               addingService(reference);
+                                       }
+
+                                       @Override
+                                       public void removedService(
+                                               ServiceReference<T> reference,
+                                               Tracked<ServiceObjects<T>, S> 
tracked) {
+
+                                               tracked.program.close();
+
+                                               if (tracked.result != null) {
+                                                       
removedSource.accept(tracked.result);
+                                               }
+                                       }
+                               });
+
+                       return new OSGiResultImpl<>(
+                               added, removed, serviceTracker::open,
+                               serviceTracker::close);
+               });
+       }
+
+}

Added: 
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServicesOSGi.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServicesOSGi.java?rev=1771765&view=auto
==============================================================================
--- 
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServicesOSGi.java
 (added)
+++ 
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServicesOSGi.java
 Mon Nov 28 16:42:38 2016
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aries.osgi.functional.internal;
+
+import org.apache.aries.osgi.functional.OSGi;
+import org.apache.aries.osgi.functional.OSGiResult;
+import org.osgi.framework.ServiceObjects;
+import org.osgi.framework.ServiceReference;
+import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.util.tracker.ServiceTrackerCustomizer;
+
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * @author Carlos Sierra Andrés
+ */
+public class ServicesOSGi<T> extends OSGiImpl<T> {
+
+       private final String _filterString;
+
+       private final Class<T> _clazz;
+
+       public ServicesOSGi(Class<T> clazz, String filterString) {
+               super(bundleContext -> {
+                       Pipe<Tuple<T>, Tuple<T>> added = Pipe.create();
+
+                       Pipe<Tuple<T>, Tuple<T>> removed = Pipe.create();
+
+                       Consumer<Tuple<T>> addedSource = added.getSource();
+
+                       Consumer<Tuple<T>> removedSource = removed.getSource();
+
+                       ServiceTracker<T, Tuple<T>> serviceTracker =
+                               new ServiceTracker<>(
+                                       bundleContext,
+                                       OSGiImpl.buildFilter(
+                                               bundleContext, filterString, 
clazz),
+                                       new ServiceTrackerCustomizer<T, 
Tuple<T>>() {
+                                               @Override
+                                               public Tuple<T> addingService(
+                                                       ServiceReference<T> 
reference) {
+
+                                                       ServiceObjects<T> 
serviceObjects =
+                                                               
bundleContext.getServiceObjects(reference);
+
+                                                       T service = 
serviceObjects.getService();
+
+                                                       Tuple<T> tuple = 
Tuple.create(service);
+
+                                                       
addedSource.accept(tuple);
+
+                                                       return tuple;
+                                               }
+
+                                               @Override
+                                               public void modifiedService(
+                                                       ServiceReference<T> 
reference,
+                                                       Tuple<T> service) {
+
+                                                       
removedService(reference, service);
+
+                                                       
addingService(reference);
+                                               }
+
+                                               @Override
+                                               public void removedService(
+                                                       ServiceReference<T> 
reference, Tuple<T> tuple) {
+
+                                                       ServiceObjects<T> 
serviceObjects =
+                                                               
bundleContext.getServiceObjects(reference);
+
+                                                       
removedSource.accept(tuple);
+
+                                                       
serviceObjects.ungetService(tuple.t);
+                                               }
+                                       });
+
+                       return new OSGiResultImpl<>(
+                               added, removed, serviceTracker::open,
+                               serviceTracker::close);
+               });
+
+               _filterString = filterString;
+
+               _clazz = clazz;
+       }
+
+       @Override
+       public <S> OSGiImpl<S> flatMap(Function<? super T, OSGi<? extends S>> 
fun) {
+               return new OSGiImpl<>(bundleContext -> {
+                       Pipe<Tuple<S>, Tuple<S>> added = Pipe.create();
+
+                       Pipe<Tuple<S>, Tuple<S>> removed = Pipe.create();
+
+                       Consumer<Tuple<S>> addedSource = added.getSource();
+
+                       Consumer<Tuple<S>> removedSource = removed.getSource();
+
+                       ServiceTracker<T, Tracked<T, S>> serviceTracker =
+                               new ServiceTracker<>(
+                                       bundleContext,
+                                       buildFilter(
+                                               bundleContext, _filterString, 
_clazz),
+                                       new ServiceTrackerCustomizer<T, 
Tracked<T, S>>() {
+                                               @Override
+                                               public Tracked<T, S> 
addingService(
+                                                       ServiceReference<T> 
reference) {
+
+                                                       ServiceObjects<T> 
serviceObjects =
+                                                               
bundleContext.getServiceObjects(
+                                                                       
reference);
+
+                                                       T service = 
serviceObjects.getService();
+
+                                                       OSGi<? extends S> 
program = fun.apply(service);
+
+                                                       Tracked<T, S> tracked = 
new Tracked<>();
+
+                                                       OSGiResult<? extends S> 
result = program.run(
+                                                               bundleContext, 
s -> {
+                                                                       
Tuple<S> tuple = Tuple.create(s);
+
+                                                                       
tracked.result = tuple;
+
+                                                                       
addedSource.accept(tuple);
+                                                               }
+                                                       );
+
+                                                       tracked.service = 
service;
+                                                       tracked.program = 
result;
+
+                                                       return tracked;
+                                               }
+
+                                               @Override
+                                               public void modifiedService(
+                                                       ServiceReference<T> 
reference,
+                                                       Tracked<T, S> tracked) {
+
+                                                       
removedService(reference, tracked);
+
+                                                       
addingService(reference);
+                                               }
+
+                                               @Override
+                                               public void removedService(
+                                                       ServiceReference<T> 
reference,
+                                                       Tracked<T, S> tracked) {
+
+                                                       tracked.program.close();
+
+                                                       if (tracked.result != 
null) {
+                                                               
removedSource.accept(tracked.result);
+                                                       }
+
+                                                       ServiceObjects<T> 
serviceObjects =
+                                                               
bundleContext.getServiceObjects(
+                                                                       
reference);
+
+                                                       
serviceObjects.ungetService(
+                                                               
tracked.service);
+                                               }
+                                       });
+
+                       return new OSGiResultImpl<>(
+                               added, removed, serviceTracker::open,
+                               serviceTracker::close);
+
+               });
+       }
+
+}

Modified: 
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tracked.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tracked.java?rev=1771765&r1=1771764&r2=1771765&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tracked.java
 (original)
+++ 
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tracked.java
 Mon Nov 28 16:42:38 2016
@@ -25,7 +25,7 @@ import org.apache.aries.osgi.functional.
 class Tracked<T, S> {
 
        T service = null;
-       OSGiResult<S> program = null;
+       OSGiResult<? extends S> program = null;
 
        Tuple<S> result = null;
 

Modified: 
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java?rev=1771765&r1=1771764&r2=1771765&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java
 (original)
+++ 
aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java
 Mon Nov 28 16:42:38 2016
@@ -33,7 +33,7 @@ class Tuple<T> {
                this.t = t;
        }
 
-       public <S> Tuple<S> map(Function<T, S> fun) {
+       public <S> Tuple<S> map(Function<? super T, ? extends S> fun) {
                return new Tuple<>(original, fun.apply(t));
        }
 


Reply via email to