Author: csierra Date: Tue Apr 3 07:54:27 2018 New Revision: 1828205 URL: http://svn.apache.org/viewvc?rev=1828205&view=rev Log: Move to default methods
These implementations rely only on OSGiRunnable interface, so they can be moved to default methods Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java?rev=1828205&r1=1828204&r2=1828205&view=diff ============================================================================== --- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java (original) +++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java Tue Apr 3 07:54:27 2018 @@ -46,6 +46,7 @@ import org.apache.aries.functional.Funct import org.apache.aries.osgi.functional.internal.BundleContextOSGiImpl; import org.apache.aries.osgi.functional.internal.BundleOSGi; import org.apache.aries.osgi.functional.internal.ChangeContextOSGiImpl; +import org.apache.aries.osgi.functional.internal.ConcurrentDoublyLinkedList; import org.apache.aries.osgi.functional.internal.ConfigurationOSGiImpl; import org.apache.aries.osgi.functional.internal.ConfigurationsOSGiImpl; import org.apache.aries.osgi.functional.internal.AllOSGi; @@ -56,6 +57,7 @@ import org.apache.aries.osgi.functional. import org.apache.aries.osgi.functional.internal.OSGiImpl; import org.apache.aries.osgi.functional.internal.OSGiResultImpl; import org.apache.aries.osgi.functional.internal.OnCloseOSGiImpl; +import org.apache.aries.osgi.functional.internal.Pad; import org.apache.aries.osgi.functional.internal.ServiceReferenceOSGi; import org.apache.aries.osgi.functional.internal.ServiceRegistrationOSGiImpl; import org.osgi.framework.Bundle; @@ -64,9 +66,12 @@ import org.osgi.framework.ServiceFactory import org.osgi.framework.ServiceObjects; import org.osgi.framework.ServiceRegistration; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Dictionary; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -75,6 +80,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.stream.Collectors; /** * @author Carlos Sierra Andrés @@ -396,67 +402,255 @@ public interface OSGi<T> extends OSGiRun )); } - default <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun) { - return fun.flatMap(this::map); + default <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun) { + return fromOsgiRunnable((bundleContext, op) -> { + ConcurrentDoublyLinkedList<T> identities = + new ConcurrentDoublyLinkedList<>(); + + ConcurrentDoublyLinkedList<Function<T, S>> funs = + new ConcurrentDoublyLinkedList<>(); + + OSGiResult myResult = run( + bundleContext, + t -> { + ConcurrentDoublyLinkedList.Node node = + identities.addLast(t); + + List<Runnable> terminators = funs.stream().map( + f -> op.apply(f.apply(t)) + ).collect( + Collectors.toList() + ); + + return () -> { + node.remove(); + + terminators.forEach(Runnable::run); + }; + } + ); + + OSGiResult funRun = fun.run( + bundleContext, + f -> { + ConcurrentDoublyLinkedList.Node node = funs.addLast(f); + + List<Runnable> terminators = identities.stream().map( + t -> op.apply(f.apply(t)) + ).collect( + Collectors.toList() + ); + + return () -> { + node.remove(); + + terminators.forEach(Runnable::run); + }; + }); + + return + () -> { + myResult.close(); + + funRun.close(); + }; + }); } - <S> OSGi<S> choose( + default <S> OSGi<S> choose( Predicate<T> chooser, Function<OSGi<T>, OSGi<S>> then, - Function<OSGi<T>, OSGi<S>> otherwise); + Function<OSGi<T>, OSGi<S>> otherwise) { + + return fromOsgiRunnable((bundleContext, publisher) -> { + Pad<T, S> thenPad = new Pad<>(bundleContext, then, publisher); + Pad<T, S> elsePad = new Pad<>(bundleContext, otherwise, publisher); + + OSGiResult result = run( + bundleContext, + t -> { + if (chooser.test(t)) { + return thenPad.publish(t); + } else { + return elsePad.publish(t); + } + }); + return () -> { + thenPad.close(); + elsePad.close(); + result.close(); + }; + }); + } + + default <S> OSGi<S> distribute(Function<OSGi<T>, OSGi<S>> ... funs) { + return fromOsgiRunnable((bundleContext, publisher) -> { + List<Pad<T, S>> pads = + Arrays.stream( + funs + ).map( + fun -> new Pad<>(bundleContext, fun, publisher) + ).collect( + Collectors.toList() + ); + + OSGiResult result = run( + bundleContext, + t -> { + List<Runnable> terminators = + pads.stream().map(p -> p.publish(t)).collect( + Collectors.toList()); + + return () -> terminators.forEach(Runnable::run); + }); + + return () -> { + result.close(); + + pads.forEach(Pad::close); + }; + }); + } + + default OSGi<T> effects( + Consumer<? super T> onAdded, Consumer<? super T> onRemoved) { + + return fromOsgiRunnable((bundleContext, op) -> + run( + bundleContext, + t -> { + onAdded.accept(t); + + Runnable terminator; + try { + terminator = op.apply(t); + } + catch (Exception e) { + onRemoved.accept(t); + + throw e; + } - <S> OSGi<S> distribute(Function<OSGi<T>, OSGi<S>> ... funs); + return () -> { + onRemoved.accept(t); - OSGi<T> effects(Consumer<? super T> onAdded, Consumer<? super T> onRemoved); + terminator.run(); + }; + })); + } default OSGi<T> effects(Effect<? super T> effect) { return effects(effect.getOnIncoming(), effect.getOnLeaving()); } default OSGi<T> filter(Predicate<T> predicate) { - return flatMap(t -> { - if (predicate.test(t)) { - return just(t); - } - else { - return nothing(); - } - }); + return fromOsgiRunnable((bundleContext, op) -> + run( + bundleContext, + t -> { + if (predicate.test(t)) { + return op.apply(t); + } + else { + return NOOP; + } + } + )); } - <S> OSGi<S> flatMap(Function<? super T, OSGi<? extends S>> fun); + default <S> OSGi<S> flatMap(Function<? super T, OSGi<? extends S>> fun) { + return fromOsgiRunnable((bundleContext, op) -> + run(bundleContext, t -> fun.apply(t).run(bundleContext, op)) + ); + } - default public OSGi<Void> foreach(Consumer<? super T> onAdded) { + default OSGi<Void> foreach(Consumer<? super T> onAdded) { return foreach(onAdded, __ -> {}); } - default public OSGi<Void> foreach( + default OSGi<Void> foreach( Consumer<? super T> onAdded, Consumer<? super T> onRemoved) { return ignore(effects(onAdded, onRemoved)); } - <S> OSGi<S> map(Function<? super T, ? extends S> function); + default <S> OSGi<S> map(Function<? super T, ? extends S> function) { + return fromOsgiRunnable((bundleContext, op) -> + run(bundleContext, t -> op.apply(function.apply(t))) + ); + } + + default OSGi<T> recover(BiFunction<T, Exception, T> onError) { + return fromOsgiRunnable((bundleContext, op) -> + run( + bundleContext, + t -> { + try { + return op.apply(t); + } + catch (Exception e) { + return op.apply(onError.apply(t, e)); + } + } + )); + } + + default OSGi<T> recoverWith(BiFunction<T, Exception, OSGi<T>> onError) { + return fromOsgiRunnable((bundleContext, op) -> + run( + bundleContext, + t -> { + try { + return op.apply(t); + } + catch (Exception e) { + return onError.apply(t, e).run(bundleContext, op); + } + } + )); + } + + default <K, S> OSGi<S> splitBy( + Function<T, OSGi<K>> mapper, BiFunction<K, OSGi<T>, OSGi<S>> fun) { - OSGi<T> recover(BiFunction<T, Exception, T> onError); + return fromOsgiRunnable((bundleContext, op) -> { + HashMap<K, Pad<T, S>> pads = new HashMap<>(); - OSGi<T> recoverWith(BiFunction<T, Exception, OSGi<T>> onError); + OSGiResult result = run( + bundleContext, + t -> mapper.apply(t).run( + bundleContext, + k -> pads.computeIfAbsent( + k, + __ -> new Pad<>( + bundleContext, + ___ -> fun.apply(k, ___), op) + ).publish(t) + ) + ); - <K, S> OSGi<S> splitBy( - Function<T, OSGi<K>> mapper, BiFunction<K, OSGi<T>, OSGi<S>> fun); + return () -> { + pads.values().forEach(Pad::close); + + result.close(); + }; + }); + } default public <S> OSGi<S> then(OSGi<S> next) { return flatMap(__ -> next); } - <S> OSGi<S> transform(Transformer<T, S> fun); + default <S> OSGi<S> transform(Transformer<T, S> fun) { + return fromOsgiRunnable( + (bundleContext, op) -> run(bundleContext, fun.transform(op))); + } static <T> OSGi<T> fromOsgiRunnable(OSGiRunnable<T> runnable) { - return getOsgiFactory().create( - (b, op) -> new OSGiResultImpl(runnable.run(b, op))); + return getOsgiFactory().create(runnable); } static OSGiFactory getOsgiFactory() { - return OSGiImpl::new; + return OSGiImpl::create; } } Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java?rev=1828205&r1=1828204&r2=1828205&view=diff ============================================================================== --- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java (original) +++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java Tue Apr 3 07:54:27 2018 @@ -18,275 +18,26 @@ package org.apache.aries.osgi.functional.internal; import org.apache.aries.osgi.functional.OSGi; +import org.apache.aries.osgi.functional.OSGiFactory; import org.apache.aries.osgi.functional.OSGiResult; import org.apache.aries.osgi.functional.OSGiRunnable; import org.apache.aries.osgi.functional.Publisher; -import org.apache.aries.osgi.functional.Transformer; -import org.apache.aries.osgi.functional.internal.ConcurrentDoublyLinkedList.Node; import org.osgi.framework.BundleContext; import org.osgi.framework.Filter; import org.osgi.framework.InvalidSyntaxException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.function.BiFunction; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.Predicate; -import java.util.stream.Collectors; - /** * @author Carlos Sierra Andrés */ public class OSGiImpl<T> implements OSGi<T> { - public OSGiImpl(OSGiRunnable<T> operation) { + protected OSGiImpl(OSGiRunnable<T> operation) { _operation = operation; } - @Override - public <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun) { - return new OSGiImpl<>( - (bundleContext, op) -> { - ConcurrentDoublyLinkedList<T> identities = - new ConcurrentDoublyLinkedList<>(); - - ConcurrentDoublyLinkedList<Function<T, S>> funs = - new ConcurrentDoublyLinkedList<>(); - - OSGiResult myResult = run( - bundleContext, - t -> { - Node node = identities.addLast(t); - - List<Runnable> terminators = funs.stream().map( - f -> op.apply(f.apply(t)) - ).collect( - Collectors.toList() - ); - - return () -> { - node.remove(); - - terminators.forEach(Runnable::run); - }; - } - ); - - OSGiResult funRun = fun.run( - bundleContext, - f -> { - Node node = funs.addLast(f); - - List<Runnable> terminators = identities.stream().map( - t -> op.apply(f.apply(t)) - ).collect( - Collectors.toList() - ); - - return () -> { - node.remove(); - - terminators.forEach(Runnable::run); - }; - }); - - return new OSGiResultImpl( - () -> { - myResult.close(); - - funRun.close(); - }); - }); - } - - @Override - public <S> OSGi<S> choose( - Predicate<T> chooser, Function<OSGi<T>, OSGi<S>> then, - Function<OSGi<T>, OSGi<S>> otherwise) { - - return new OSGiImpl<>((bundleContext, publisher) -> { - Pad<T, S> thenPad = new Pad<>(bundleContext, then, publisher); - Pad<T, S> elsePad = new Pad<>(bundleContext, otherwise, publisher); - - OSGiResult result = run( - bundleContext, - t -> { - if (chooser.test(t)) { - return thenPad.publish(t); - } else { - return elsePad.publish(t); - } - }); - return new OSGiResultImpl( - () -> { - thenPad.close(); - elsePad.close(); - result.close(); - }); - }); - } - - @Override - @SafeVarargs - public final <S> OSGi<S> distribute(Function<OSGi<T>, OSGi<S>> ... funs) { - return new OSGiImpl<>((bundleContext, publisher) -> { - List<Pad<T, S>> pads = - Arrays.stream( - funs - ).map( - fun -> new Pad<>(bundleContext, fun, publisher) - ).collect( - Collectors.toList() - ); - - OSGiResult result = run( - bundleContext, - t -> { - List<Runnable> terminators = - pads.stream().map(p -> p.publish(t)).collect( - Collectors.toList()); - - return () -> terminators.forEach(Runnable::run); - }); - - return new OSGiResultImpl( - () -> { - result.close(); - - pads.forEach(Pad::close); - }); - }); - } - - @Override - public OSGi<T> effects( - Consumer<? super T> onAdded, Consumer<? super T> onRemoved) { - - return new OSGiImpl<>((bundleContext, op) -> - run( - bundleContext, - t -> { - onAdded.accept(t); - - Runnable terminator; - try { - terminator = op.apply(t); - } - catch (Exception e) { - onRemoved.accept(t); - - throw e; - } - - return () -> { - onRemoved.accept(t); - - terminator.run(); - }; - })); - } - - @Override - public OSGi<T> filter(Predicate<T> predicate) { - return new OSGiImpl<>((bundleContext, op) -> - run( - bundleContext, - t -> { - if (predicate.test(t)) { - return op.apply(t); - } - else { - return NOOP; - } - } - )); - } - - @Override - public <S> OSGi<S> flatMap(Function<? super T, OSGi<? extends S>> fun) { - return new OSGiImpl<>((bundleContext, op) -> - run(bundleContext, t -> fun.apply(t).run(bundleContext, op)) - ); - } - - @Override - public <S> OSGi<S> map(Function<? super T, ? extends S> function) { - return new OSGiImpl<>((bundleContext, op) -> - run(bundleContext, t -> op.apply(function.apply(t))) - ); - } - - @Override - public OSGi<T> recover(BiFunction<T, Exception, T> onError) { - return new OSGiImpl<>((bundleContext, op) -> - run( - bundleContext, - t -> { - try { - return op.apply(t); - } - catch (Exception e) { - return op.apply(onError.apply(t, e)); - } - } - )); - } - - @Override - public OSGi<T> recoverWith(BiFunction<T, Exception, OSGi<T>> onError) { - return new OSGiImpl<>((bundleContext, op) -> - run( - bundleContext, - t -> { - try { - return op.apply(t); - } - catch (Exception e) { - return onError.apply(t, e).run(bundleContext, op); - } - } - )); - } - - @Override - public <K, S> OSGi<S> splitBy( - Function<T, OSGi<K>> mapper, BiFunction<K, OSGi<T>, OSGi<S>> fun) { - - return new OSGiImpl<>((bundleContext, op) -> { - HashMap<K, Pad<T, S>> pads = new HashMap<>(); - - OSGiResult result = run( - bundleContext, - t -> mapper.apply(t).run( - bundleContext, - k -> pads.computeIfAbsent( - k, - __ -> new Pad<>( - bundleContext, - ___ -> fun.apply(k, ___), op) - ).publish(t) - ) - ); - - return new OSGiResultImpl( - () -> { - pads.values().forEach(Pad::close); - - result.close(); - }); - }); - } - - @Override - public <S> OSGi<S> transform(Transformer<T, S> fun) { + public static <T> OSGi<T> create(OSGiRunnable<T> runnable) { return new OSGiImpl<>( - (bundleContext, op) -> { - OSGiResult result = run(bundleContext, fun.transform(op)); - - return new OSGiResultImpl(result); - }); + (b, op) -> new OSGiResultImpl(runnable.run(b, op))); } @Override