Author: csierra
Date: Mon Aug 28 18:37:29 2017
New Revision: 1806481

URL: http://svn.apache.org/viewvc?rev=1806481&view=rev
Log:
Optimize flatMap to use ServiceTracker

Modified:
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java?rev=1806481&r1=1806480&r2=1806481&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
 Mon Aug 28 18:37:29 2017
@@ -17,16 +17,26 @@
 
 package org.apache.aries.osgi.functional.internal;
 
+import org.apache.aries.osgi.functional.OSGi;
+import org.apache.aries.osgi.functional.OSGiResult;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceObjects;
 import org.osgi.framework.ServiceReference;
 import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.util.tracker.ServiceTrackerCustomizer;
 
 import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static org.apache.aries.osgi.functional.internal.OSGiImpl.buildFilter;
 
 /**
  * @author Carlos Sierra Andrés
  */
-public class ServiceReferenceOSGi<T>
-       extends OSGiImpl<ServiceReference<T>> {
+public class ServiceReferenceOSGi<T> extends OSGiImpl<ServiceReference<T>> {
+
+       private String _filterString;
+       private Class<T> _clazz;
 
        public ServiceReferenceOSGi(String filterString, Class<T> clazz) {
                super(bundleContext -> {
@@ -43,33 +53,42 @@ public class ServiceReferenceOSGi<T>
                                removed.getSource();
 
                        ServiceTracker<T, Tuple<ServiceReference<T>>> 
serviceTracker =
-                               new ServiceTracker<T, 
Tuple<ServiceReference<T>>>(
+                               new ServiceTracker<>(
                                        bundleContext,
-                                       OSGiImpl.buildFilter(
-                                               bundleContext, filterString, 
clazz), null) {
+                                       buildFilter(bundleContext, 
filterString, clazz),
+                                       new DefaultServiceTrackerCustomizer<>(
+                                               addedSource, removedSource));
 
-                                       @Override
-                                       public Tuple<ServiceReference<T>> 
addingService(
-                                               ServiceReference<T> reference) {
+                       return new OSGiResultImpl<>(
+                               added, removed, serviceTracker::open,
+                               serviceTracker::close);
 
-                                               Tuple<ServiceReference<T>> 
tuple = Tuple.create(
-                                                       reference);
+               });
 
-                                               addedSource.accept(tuple);
+               _filterString = filterString;
+               _clazz = clazz;
+       }
+
+       @Override
+       public <S> OSGiImpl<S> flatMap(
+               Function<? super ServiceReference<T>, OSGi<? extends S>> fun) {
 
-                                               return tuple;
-                                       }
+               return new OSGiImpl<>(bundleContext -> {
+                       Pipe<Tuple<S>, Tuple<S>> added = Pipe.create();
 
-                                       @Override
-                                       public void removedService(
-                                               ServiceReference<T> reference,
-                                               Tuple<ServiceReference<T>> t) {
+                       Pipe<Tuple<S>, Tuple<S>> removed = Pipe.create();
 
-                                               super.removedService(reference, 
t);
+                       Consumer<Tuple<S>> addedSource = added.getSource();
 
-                                               removedSource.accept(t);
-                                       }
-                               };
+                       Consumer<Tuple<S>> removedSource = removed.getSource();
+
+                       ServiceTracker<T, Tracked<T, S>> serviceTracker =
+                               new ServiceTracker<>(
+                                       bundleContext,
+                                       buildFilter(
+                                               bundleContext, _filterString, 
_clazz),
+                                               new 
FlatMapServiceTrackerCustomizer<>(
+                                                       fun, bundleContext, 
addedSource, removedSource));
 
                        return new OSGiResultImpl<>(
                                added, removed, serviceTracker::open,
@@ -78,4 +97,104 @@ public class ServiceReferenceOSGi<T>
                });
        }
 
+       private static class DefaultServiceTrackerCustomizer<T>
+               implements ServiceTrackerCustomizer<T, 
Tuple<ServiceReference<T>>> {
+
+               private final Consumer<Tuple<ServiceReference<T>>> _addedSource;
+               private final Consumer<Tuple<ServiceReference<T>>> 
_removedSource;
+
+               public DefaultServiceTrackerCustomizer(
+                       Consumer<Tuple<ServiceReference<T>>> addedSource,
+                       Consumer<Tuple<ServiceReference<T>>> removedSource) {
+
+                       _addedSource = addedSource;
+                       _removedSource = removedSource;
+               }
+
+               @Override
+               public Tuple<ServiceReference<T>> addingService(
+                       ServiceReference<T> reference) {
+
+                       Tuple<ServiceReference<T>> tuple = 
Tuple.create(reference);
+
+                       _addedSource.accept(tuple);
+
+                       return tuple;
+               }
+
+               @Override
+               public void modifiedService(
+                       ServiceReference<T> reference, 
Tuple<ServiceReference<T>> service) {
+
+               }
+
+               @Override
+               public void removedService(
+                       ServiceReference<T> reference, 
Tuple<ServiceReference<T>> tuple) {
+
+                       _removedSource.accept(tuple);
+               }
+       }
+
+       private static class FlatMapServiceTrackerCustomizer<T, S>
+               implements ServiceTrackerCustomizer<T, Tracked<T, S>> {
+               private final Function<? super ServiceReference<T>, OSGi<? 
extends S>>
+                       _fun;
+               private final BundleContext _bundleContext;
+               private final Consumer<Tuple<S>> _addedSource;
+
+               private final Consumer<Tuple<S>> _removedSource;
+
+               public FlatMapServiceTrackerCustomizer(
+                       Function<? super ServiceReference<T>, OSGi<? extends 
S>> fun,
+                       BundleContext bundleContext, Consumer<Tuple<S>> 
addedSource,
+                       Consumer<Tuple<S>> removedSource) {
+
+                       _fun = fun;
+                       _bundleContext = bundleContext;
+                       _addedSource = addedSource;
+                       _removedSource = removedSource;
+               }
+
+               @Override
+        public Tracked<T, S> addingService(ServiceReference<T> reference) {
+            OSGi<? extends S> program = _fun.apply(reference);
+
+            Tracked<T, S> tracked = new Tracked<>();
+
+            tracked.program = program.run(
+                               _bundleContext, s -> {
+                    Tuple<S> tuple = Tuple.create(s);
+
+                    tracked.result = tuple;
+
+                    _addedSource.accept(tuple);
+                }
+            );
+
+            return tracked;
+        }
+
+               @Override
+        public void modifiedService(
+               ServiceReference<T> reference, Tracked<T, S> tracked) {
+
+            removedService(reference, tracked);
+
+            addingService(reference);
+        }
+
+               @Override
+        public void removedService(
+            ServiceReference<T> reference, Tracked<T, S> tracked) {
+
+            tracked.program.close();
+
+            if (tracked.result != null) {
+                _removedSource.accept(tracked.result);
+            }
+        }
+
+       }
+
 }


Reply via email to