Author: csierra
Date: Tue Apr  3 07:54:27 2018
New Revision: 1828205

URL: http://svn.apache.org/viewvc?rev=1828205&view=rev
Log:
Move to default methods

These implementations rely only on OSGiRunnable interface, so they can
be moved to default methods

Modified:
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java?rev=1828205&r1=1828204&r2=1828205&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
 Tue Apr  3 07:54:27 2018
@@ -46,6 +46,7 @@ import org.apache.aries.functional.Funct
 import org.apache.aries.osgi.functional.internal.BundleContextOSGiImpl;
 import org.apache.aries.osgi.functional.internal.BundleOSGi;
 import org.apache.aries.osgi.functional.internal.ChangeContextOSGiImpl;
+import org.apache.aries.osgi.functional.internal.ConcurrentDoublyLinkedList;
 import org.apache.aries.osgi.functional.internal.ConfigurationOSGiImpl;
 import org.apache.aries.osgi.functional.internal.ConfigurationsOSGiImpl;
 import org.apache.aries.osgi.functional.internal.AllOSGi;
@@ -56,6 +57,7 @@ import org.apache.aries.osgi.functional.
 import org.apache.aries.osgi.functional.internal.OSGiImpl;
 import org.apache.aries.osgi.functional.internal.OSGiResultImpl;
 import org.apache.aries.osgi.functional.internal.OnCloseOSGiImpl;
+import org.apache.aries.osgi.functional.internal.Pad;
 import org.apache.aries.osgi.functional.internal.ServiceReferenceOSGi;
 import org.apache.aries.osgi.functional.internal.ServiceRegistrationOSGiImpl;
 import org.osgi.framework.Bundle;
@@ -64,9 +66,12 @@ import org.osgi.framework.ServiceFactory
 import org.osgi.framework.ServiceObjects;
 import org.osgi.framework.ServiceRegistration;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -75,6 +80,7 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 /**
  * @author Carlos Sierra Andrés
@@ -396,67 +402,255 @@ public interface OSGi<T> extends OSGiRun
                ));
        }
 
-       default <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun) {
-               return fun.flatMap(this::map);
+       default  <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun) {
+               return fromOsgiRunnable((bundleContext, op) -> {
+                       ConcurrentDoublyLinkedList<T> identities =
+                               new ConcurrentDoublyLinkedList<>();
+
+                       ConcurrentDoublyLinkedList<Function<T, S>> funs =
+                               new ConcurrentDoublyLinkedList<>();
+
+                       OSGiResult myResult = run(
+                               bundleContext,
+                               t -> {
+                                       ConcurrentDoublyLinkedList.Node node =
+                                               identities.addLast(t);
+
+                                       List<Runnable> terminators = 
funs.stream().map(
+                                               f -> op.apply(f.apply(t))
+                                       ).collect(
+                                               Collectors.toList()
+                                       );
+
+                                       return () -> {
+                                               node.remove();
+
+                                               
terminators.forEach(Runnable::run);
+                                       };
+                               }
+                       );
+
+                       OSGiResult funRun = fun.run(
+                               bundleContext,
+                               f -> {
+                                       ConcurrentDoublyLinkedList.Node node = 
funs.addLast(f);
+
+                                       List<Runnable> terminators = 
identities.stream().map(
+                                               t -> op.apply(f.apply(t))
+                                       ).collect(
+                                               Collectors.toList()
+                                       );
+
+                                       return () -> {
+                                               node.remove();
+
+                                               
terminators.forEach(Runnable::run);
+                                       };
+                               });
+
+                       return
+                               () -> {
+                                       myResult.close();
+
+                                       funRun.close();
+                               };
+               });
        }
 
-       <S> OSGi<S> choose(
+       default <S> OSGi<S> choose(
                Predicate<T> chooser, Function<OSGi<T>, OSGi<S>> then,
-               Function<OSGi<T>, OSGi<S>> otherwise);
+               Function<OSGi<T>, OSGi<S>> otherwise) {
+
+               return fromOsgiRunnable((bundleContext, publisher) -> {
+                       Pad<T, S> thenPad = new Pad<>(bundleContext, then, 
publisher);
+                       Pad<T, S> elsePad = new Pad<>(bundleContext, otherwise, 
publisher);
+
+                       OSGiResult result = run(
+                               bundleContext,
+                               t -> {
+                                       if (chooser.test(t)) {
+                                               return thenPad.publish(t);
+                                       } else {
+                                               return elsePad.publish(t);
+                                       }
+                               });
+                       return () -> {
+                               thenPad.close();
+                               elsePad.close();
+                               result.close();
+                       };
+               });
+       }
+
+       default <S> OSGi<S> distribute(Function<OSGi<T>, OSGi<S>> ... funs) {
+               return fromOsgiRunnable((bundleContext, publisher) -> {
+                       List<Pad<T, S>> pads =
+                               Arrays.stream(
+                                       funs
+                               ).map(
+                                       fun -> new Pad<>(bundleContext, fun, 
publisher)
+                               ).collect(
+                                       Collectors.toList()
+                               );
+
+                       OSGiResult result = run(
+                               bundleContext,
+                               t -> {
+                                       List<Runnable> terminators =
+                                               pads.stream().map(p -> 
p.publish(t)).collect(
+                                                       Collectors.toList());
+
+                                       return () -> 
terminators.forEach(Runnable::run);
+                               });
+
+                       return () -> {
+                               result.close();
+
+                               pads.forEach(Pad::close);
+                       };
+               });
+       }
+
+       default OSGi<T> effects(
+               Consumer<? super T> onAdded, Consumer<? super T> onRemoved) {
+
+               return fromOsgiRunnable((bundleContext, op) ->
+                       run(
+                               bundleContext,
+                               t -> {
+                                       onAdded.accept(t);
+
+                                       Runnable terminator;
+                                       try {
+                                               terminator = op.apply(t);
+                                       }
+                                       catch (Exception e) {
+                                               onRemoved.accept(t);
+
+                                               throw e;
+                                       }
 
-       <S> OSGi<S> distribute(Function<OSGi<T>, OSGi<S>> ... funs);
+                                       return () -> {
+                                               onRemoved.accept(t);
 
-       OSGi<T> effects(Consumer<? super T> onAdded, Consumer<? super T> 
onRemoved);
+                                               terminator.run();
+                                       };
+                               }));
+       }
 
        default OSGi<T> effects(Effect<? super T> effect) {
                return effects(effect.getOnIncoming(), effect.getOnLeaving());
        }
 
        default OSGi<T> filter(Predicate<T> predicate) {
-               return flatMap(t -> {
-                       if (predicate.test(t)) {
-                               return just(t);
-                       }
-                       else {
-                               return nothing();
-                       }
-               });
+               return fromOsgiRunnable((bundleContext, op) ->
+                       run(
+                               bundleContext,
+                               t -> {
+                                       if (predicate.test(t)) {
+                                               return op.apply(t);
+                                       }
+                                       else {
+                                               return NOOP;
+                                       }
+                               }
+                       ));
        }
 
-       <S> OSGi<S> flatMap(Function<? super T, OSGi<? extends S>> fun);
+       default <S> OSGi<S> flatMap(Function<? super T, OSGi<? extends S>> fun) 
{
+               return fromOsgiRunnable((bundleContext, op) ->
+                       run(bundleContext, t -> fun.apply(t).run(bundleContext, 
op))
+               );
+       }
 
-       default public OSGi<Void> foreach(Consumer<? super T> onAdded) {
+       default OSGi<Void> foreach(Consumer<? super T> onAdded) {
                return foreach(onAdded, __ -> {});
        }
 
-       default public OSGi<Void> foreach(
+       default OSGi<Void> foreach(
                Consumer<? super T> onAdded, Consumer<? super T> onRemoved) {
 
                return ignore(effects(onAdded, onRemoved));
        }
 
-       <S> OSGi<S> map(Function<? super T, ? extends S> function);
+       default <S> OSGi<S> map(Function<? super T, ? extends S> function) {
+               return fromOsgiRunnable((bundleContext, op) ->
+                       run(bundleContext, t -> op.apply(function.apply(t)))
+               );
+       }
+
+       default OSGi<T> recover(BiFunction<T, Exception, T> onError) {
+               return fromOsgiRunnable((bundleContext, op) ->
+                       run(
+                               bundleContext,
+                               t -> {
+                                       try {
+                                               return op.apply(t);
+                                       }
+                                       catch (Exception e) {
+                                               return 
op.apply(onError.apply(t, e));
+                                       }
+                               }
+                       ));
+       }
+
+       default OSGi<T> recoverWith(BiFunction<T, Exception, OSGi<T>> onError) {
+               return fromOsgiRunnable((bundleContext, op) ->
+                       run(
+                               bundleContext,
+                               t -> {
+                                       try {
+                                               return op.apply(t);
+                                       }
+                                       catch (Exception e) {
+                                               return onError.apply(t, 
e).run(bundleContext, op);
+                                       }
+                               }
+                       ));
+       }
+
+       default <K, S> OSGi<S> splitBy(
+               Function<T, OSGi<K>> mapper, BiFunction<K, OSGi<T>, OSGi<S>> 
fun) {
 
-       OSGi<T> recover(BiFunction<T, Exception, T> onError);
+               return fromOsgiRunnable((bundleContext, op) -> {
+                       HashMap<K, Pad<T, S>> pads = new HashMap<>();
 
-       OSGi<T> recoverWith(BiFunction<T, Exception, OSGi<T>> onError);
+                       OSGiResult result = run(
+                               bundleContext,
+                               t -> mapper.apply(t).run(
+                                       bundleContext,
+                                       k -> pads.computeIfAbsent(
+                                               k,
+                                               __ -> new Pad<>(
+                                                       bundleContext,
+                                                       ___ -> fun.apply(k, 
___), op)
+                                       ).publish(t)
+                               )
+                       );
 
-       <K, S> OSGi<S> splitBy(
-               Function<T, OSGi<K>> mapper, BiFunction<K, OSGi<T>, OSGi<S>> 
fun);
+                       return () -> {
+                               pads.values().forEach(Pad::close);
+
+                               result.close();
+                       };
+               });
+       }
 
        default public <S> OSGi<S> then(OSGi<S> next) {
                return flatMap(__ -> next);
        }
 
-       <S> OSGi<S> transform(Transformer<T, S> fun);
+       default <S> OSGi<S> transform(Transformer<T, S> fun) {
+               return fromOsgiRunnable(
+                       (bundleContext, op) -> run(bundleContext, 
fun.transform(op)));
+       }
 
        static <T> OSGi<T> fromOsgiRunnable(OSGiRunnable<T> runnable) {
-               return getOsgiFactory().create(
-                       (b, op) -> new OSGiResultImpl(runnable.run(b, op)));
+               return getOsgiFactory().create(runnable);
        }
 
        static OSGiFactory getOsgiFactory() {
-               return OSGiImpl::new;
+               return OSGiImpl::create;
        }
 
 }

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java?rev=1828205&r1=1828204&r2=1828205&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
 Tue Apr  3 07:54:27 2018
@@ -18,275 +18,26 @@
 package org.apache.aries.osgi.functional.internal;
 
 import org.apache.aries.osgi.functional.OSGi;
+import org.apache.aries.osgi.functional.OSGiFactory;
 import org.apache.aries.osgi.functional.OSGiResult;
 import org.apache.aries.osgi.functional.OSGiRunnable;
 import org.apache.aries.osgi.functional.Publisher;
-import org.apache.aries.osgi.functional.Transformer;
-import 
org.apache.aries.osgi.functional.internal.ConcurrentDoublyLinkedList.Node;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.Filter;
 import org.osgi.framework.InvalidSyntaxException;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.function.BiFunction;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-
 /**
  * @author Carlos Sierra Andrés
  */
 public class OSGiImpl<T> implements OSGi<T> {
 
-       public OSGiImpl(OSGiRunnable<T> operation) {
+       protected OSGiImpl(OSGiRunnable<T> operation) {
                _operation = operation;
        }
 
-       @Override
-       public <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun) {
-               return new OSGiImpl<>(
-                       (bundleContext, op) -> {
-                               ConcurrentDoublyLinkedList<T> identities =
-                                       new ConcurrentDoublyLinkedList<>();
-
-                               ConcurrentDoublyLinkedList<Function<T, S>> funs 
=
-                                       new ConcurrentDoublyLinkedList<>();
-
-                               OSGiResult myResult = run(
-                                       bundleContext,
-                                       t -> {
-                                               Node node = 
identities.addLast(t);
-
-                                               List<Runnable> terminators = 
funs.stream().map(
-                                                       f -> 
op.apply(f.apply(t))
-                                               ).collect(
-                                                       Collectors.toList()
-                                               );
-
-                                               return () -> {
-                                                       node.remove();
-
-                                                       
terminators.forEach(Runnable::run);
-                                               };
-                                       }
-                               );
-
-                               OSGiResult funRun = fun.run(
-                                       bundleContext,
-                                       f -> {
-                                               Node node = funs.addLast(f);
-
-                                               List<Runnable> terminators = 
identities.stream().map(
-                                                       t -> 
op.apply(f.apply(t))
-                                               ).collect(
-                                                       Collectors.toList()
-                                               );
-
-                                               return () -> {
-                                                       node.remove();
-
-                                                       
terminators.forEach(Runnable::run);
-                                               };
-                                       });
-
-                               return new OSGiResultImpl(
-                                       () -> {
-                                               myResult.close();
-
-                                               funRun.close();
-                                       });
-                       });
-       }
-
-       @Override
-       public <S> OSGi<S> choose(
-               Predicate<T> chooser, Function<OSGi<T>, OSGi<S>> then,
-               Function<OSGi<T>, OSGi<S>> otherwise) {
-
-               return new OSGiImpl<>((bundleContext, publisher) -> {
-                       Pad<T, S> thenPad = new Pad<>(bundleContext, then, 
publisher);
-                       Pad<T, S> elsePad = new Pad<>(bundleContext, otherwise, 
publisher);
-
-                       OSGiResult result = run(
-                               bundleContext,
-                               t -> {
-                                       if (chooser.test(t)) {
-                                               return thenPad.publish(t);
-                                       } else {
-                                               return elsePad.publish(t);
-                                       }
-                               });
-                       return new OSGiResultImpl(
-                               () -> {
-                                       thenPad.close();
-                                       elsePad.close();
-                                       result.close();
-                               });
-               });
-       }
-
-       @Override
-       @SafeVarargs
-       public final <S> OSGi<S> distribute(Function<OSGi<T>, OSGi<S>> ... 
funs) {
-               return new OSGiImpl<>((bundleContext, publisher) -> {
-                       List<Pad<T, S>> pads =
-                               Arrays.stream(
-                                       funs
-                               ).map(
-                                       fun -> new Pad<>(bundleContext, fun, 
publisher)
-                               ).collect(
-                                       Collectors.toList()
-                       );
-
-                       OSGiResult result = run(
-                               bundleContext,
-                               t -> {
-                                       List<Runnable> terminators =
-                                               pads.stream().map(p -> 
p.publish(t)).collect(
-                                                       Collectors.toList());
-
-                                       return () -> 
terminators.forEach(Runnable::run);
-                               });
-
-                       return new OSGiResultImpl(
-                               () -> {
-                                       result.close();
-
-                                       pads.forEach(Pad::close);
-                               });
-               });
-       }
-
-       @Override
-       public OSGi<T> effects(
-               Consumer<? super T> onAdded, Consumer<? super T> onRemoved) {
-
-               return new OSGiImpl<>((bundleContext, op) ->
-                       run(
-                               bundleContext,
-                               t -> {
-                                       onAdded.accept(t);
-
-                                       Runnable terminator;
-                                       try {
-                                               terminator = op.apply(t);
-                                       }
-                                       catch (Exception e) {
-                                               onRemoved.accept(t);
-
-                                               throw e;
-                                       }
-
-                                       return () -> {
-                                               onRemoved.accept(t);
-
-                                               terminator.run();
-                                       };
-                               }));
-       }
-
-       @Override
-       public OSGi<T> filter(Predicate<T> predicate) {
-               return new OSGiImpl<>((bundleContext, op) ->
-                       run(
-                               bundleContext,
-                               t -> {
-                                       if (predicate.test(t)) {
-                                               return op.apply(t);
-                                       }
-                                       else {
-                                               return NOOP;
-                                       }
-                               }
-                       ));
-       }
-
-       @Override
-       public <S> OSGi<S> flatMap(Function<? super T, OSGi<? extends S>> fun) {
-               return new OSGiImpl<>((bundleContext, op) ->
-                       run(bundleContext, t -> fun.apply(t).run(bundleContext, 
op))
-               );
-       }
-
-       @Override
-       public <S> OSGi<S> map(Function<? super T, ? extends S> function) {
-               return new OSGiImpl<>((bundleContext, op) ->
-                       run(bundleContext, t -> op.apply(function.apply(t)))
-               );
-       }
-
-       @Override
-       public OSGi<T> recover(BiFunction<T, Exception, T> onError) {
-               return new OSGiImpl<>((bundleContext, op) ->
-                       run(
-                               bundleContext,
-                               t -> {
-                                       try {
-                                               return op.apply(t);
-                                       }
-                                       catch (Exception e) {
-                                               return 
op.apply(onError.apply(t, e));
-                                       }
-                               }
-                       ));
-       }
-
-       @Override
-       public OSGi<T> recoverWith(BiFunction<T, Exception, OSGi<T>> onError) {
-               return new OSGiImpl<>((bundleContext, op) ->
-                       run(
-                               bundleContext,
-                               t -> {
-                                       try {
-                                               return op.apply(t);
-                                       }
-                                       catch (Exception e) {
-                                               return onError.apply(t, 
e).run(bundleContext, op);
-                                       }
-                               }
-                       ));
-       }
-
-       @Override
-       public <K, S> OSGi<S> splitBy(
-               Function<T, OSGi<K>> mapper, BiFunction<K, OSGi<T>, OSGi<S>> 
fun) {
-
-               return new OSGiImpl<>((bundleContext, op) -> {
-                       HashMap<K, Pad<T, S>> pads = new HashMap<>();
-
-                       OSGiResult result = run(
-                               bundleContext,
-                               t -> mapper.apply(t).run(
-                                       bundleContext,
-                                       k -> pads.computeIfAbsent(
-                                               k,
-                                               __ -> new Pad<>(
-                                                       bundleContext,
-                                                       ___ -> fun.apply(k, 
___), op)
-                                       ).publish(t)
-                               )
-                       );
-
-                       return new OSGiResultImpl(
-                               () -> {
-                                       pads.values().forEach(Pad::close);
-
-                                       result.close();
-                               });
-               });
-       }
-
-       @Override
-       public <S> OSGi<S> transform(Transformer<T, S> fun) {
+       public static <T> OSGi<T> create(OSGiRunnable<T> runnable) {
                return new OSGiImpl<>(
-                       (bundleContext, op) -> {
-                               OSGiResult result = run(bundleContext, 
fun.transform(op));
-
-                               return new OSGiResultImpl(result);
-               });
+                       (b, op) -> new OSGiResultImpl(runnable.run(b, op)));
        }
 
        @Override


Reply via email to