Author: csierra
Date: Fri Nov 17 15:59:48 2017
New Revision: 1815577

URL: http://svn.apache.org/viewvc?rev=1815577&view=rev
Log:
[Component-DSL] Error handling

Adding some primitives that help recovering from errors

Modified:
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java
    
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java?rev=1815577&r1=1815576&r2=1815577&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
 Fri Nov 17 15:59:48 2017
@@ -47,6 +47,7 @@ import java.util.Dictionary;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -58,6 +59,10 @@ import java.util.function.Supplier;
 public interface OSGi<T> extends OSGiRunnable<T> {
        Runnable NOOP = () -> {};
 
+       OSGi<T> recover(BiFunction<T, Exception, T> onError);
+
+       OSGi<T> recoverWith(BiFunction<T, Exception, OSGi<T>> onError);
+
        OSGi<T> effects(
                Consumer<? super T> onAdded, Consumer<? super T> onRemoved);
 
@@ -254,8 +259,6 @@ public interface OSGi<T> extends OSGiRun
                return new ServiceReferenceOSGi<>(filterString, null, 
onModified);
        }
 
-
-
        @SafeVarargs
        static <T> OSGi<T> all(OSGi<T> ... programs) {
                return new DistributeOSGi<>(programs);

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java?rev=1815577&r1=1815576&r2=1815577&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java
 Fri Nov 17 15:59:48 2017
@@ -44,7 +44,12 @@ public class DistributeOSGi<T> extends O
                                 bundleContext, op)).
                             collect(Collectors.toList()));
 
-                    results.forEach(OSGiResult::start);
+                    results.forEach(osGiResult -> {
+                        try {
+                            osGiResult.start();
+                        }
+                        catch (Exception e) {}
+                    });
                 },
                 () -> {
                     for (OSGiResult result : results) {

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java?rev=1815577&r1=1815576&r2=1815577&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
 Fri Nov 17 15:59:48 2017
@@ -26,6 +26,7 @@ import org.osgi.framework.InvalidSyntaxE
 
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -60,6 +61,46 @@ public class OSGiImpl<T> implements OSGi
        }
 
        @Override
+       public OSGi<T> recover(BiFunction<T, Exception, T> onError) {
+               return new OSGiImpl<>((bundleContext, op) ->
+                       _operation.run(
+                               bundleContext,
+                               t -> {
+                                       try {
+                                               return op.apply(t);
+                                       }
+                                       catch (Exception e) {
+                                               return 
op.apply(onError.apply(t, e));
+                                       }
+                               }
+                       ));
+       }
+
+       @Override
+       public OSGi<T> recoverWith(BiFunction<T, Exception, OSGi<T>> onError) {
+               return new OSGiImpl<>((bundleContext, op) ->
+                       _operation.run(
+                               bundleContext,
+                               t -> {
+                                       try {
+                                               return op.apply(t);
+                                       }
+                                       catch (Exception e) {
+                                               OSGi<T> errorProgram = 
onError.apply(t, e);
+
+                                               OSGiResult result =
+                                                       ((OSGiImpl<T>) 
errorProgram)._operation.run(
+                                                               bundleContext, 
op);
+
+                                               result.start();
+
+                                               return result::close;
+                                       }
+                               }
+                       ));
+       }
+
+       @Override
        public OSGi<T> effects(
                Consumer<? super T> onAdded, Consumer<? super T> onRemoved) {
 
@@ -69,7 +110,15 @@ public class OSGiImpl<T> implements OSGi
                                t -> {
                                        onAdded.accept(t);
 
-                                       Runnable terminator = op.apply(t);
+                                       Runnable terminator;
+                                       try {
+                                               terminator = op.apply(t);
+                                       }
+                                       catch (Exception e) {
+                                               onRemoved.accept(t);
+
+                                               throw e;
+                                       }
 
                                        return () -> {
                                                onRemoved.accept(t);

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java?rev=1815577&r1=1815576&r2=1815577&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java
 Fri Nov 17 15:59:48 2017
@@ -19,10 +19,8 @@ package org.apache.aries.osgi.functional
 
 import org.apache.aries.osgi.functional.OSGiOperation;
 import org.apache.aries.osgi.functional.OSGiResult;
-import org.apache.aries.osgi.functional.SentEvent;
 import org.osgi.framework.BundleContext;
 
-import java.util.function.Consumer;
 import java.util.function.Function;
 
 /**

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java?rev=1815577&r1=1815576&r2=1815577&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
 Fri Nov 17 15:59:48 2017
@@ -43,14 +43,9 @@ public class OSGiResultImpl implements O
                if (_working.compareAndSet(false, true)) {
 
                        if (!_started && !_closed.get()) {
-                               try {
-                                       start.run();
+                               start.run();
 
-                                       _started = true;
-                               }
-                               catch (Exception e) {
-                                       e.printStackTrace();
-                               }
+                               _started = true;
                        }
 
                        _working.set(false);
@@ -65,12 +60,7 @@ public class OSGiResultImpl implements O
                }
 
                if (_closed.compareAndSet(false, true) && _started) {
-                       try {
-                               close.run();
-                       }
-                       catch (Exception e) {
-                               e.printStackTrace();
-                       }
+                       close.run();
                }
 
                _working.set(false);

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java?rev=1815577&r1=1815576&r2=1815577&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java
 Fri Nov 17 15:59:48 2017
@@ -29,7 +29,16 @@ public class OnCloseOSGiImpl extends OSG
                        AtomicReference<Runnable> reference = new 
AtomicReference<>();
 
                        return new OSGiResultImpl(
-                               () -> reference.set(op.apply(null)),
+                               () -> {
+                                       try {
+                                               reference.set(op.apply(null));
+                                       }
+                                       catch (Exception e) {
+                                               action.run();
+
+                                               throw e;
+                                       }
+                               },
                                () -> {
                                        action.run();
 

Modified: 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java?rev=1815577&r1=1815576&r2=1815577&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
 (original)
+++ 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
 Fri Nov 17 15:59:48 2017
@@ -20,7 +20,6 @@ package org.apache.aries.osgi.functional
 import org.apache.aries.osgi.functional.CachingServiceReference;
 import org.apache.aries.osgi.functional.OSGi;
 import org.apache.aries.osgi.functional.OSGiResult;
-import org.apache.aries.osgi.functional.Refresher;
 import org.apache.aries.osgi.functional.SentEvent;
 import org.apache.aries.osgi.functional.internal.ProbeImpl;
 import org.junit.Test;
@@ -50,6 +49,7 @@ import java.util.function.Function;
 import static org.apache.aries.osgi.functional.OSGi.configuration;
 import static org.apache.aries.osgi.functional.OSGi.configurations;
 import static org.apache.aries.osgi.functional.OSGi.just;
+import static org.apache.aries.osgi.functional.OSGi.nothing;
 import static org.apache.aries.osgi.functional.OSGi.onClose;
 import static org.apache.aries.osgi.functional.OSGi.once;
 import static org.apache.aries.osgi.functional.OSGi.register;
@@ -710,6 +710,128 @@ public class DSLTest {
         }
     }
 
+    @Test
+    public void testRecover() {
+        ArrayList<Object> result = new ArrayList<>();
+        ArrayList<Object> arrived = new ArrayList<>();
+        ArrayList<Object> left = new ArrayList<>();
+
+        OSGi<Integer> program = just(
+            Arrays.asList(1, 2, 3, 4, 5, 6)
+        ).recover(
+            (__, e) -> 0
+        ).effects(
+            arrived::add, left::add
+        ).
+        effects(
+            t -> {
+                if (t % 2 != 0) {
+                    throw new RuntimeException();
+                }
+            }
+            , __ -> {}
+        );
+
+        try (OSGiResult run = program.run(bundleContext, result::add)) {
+            assertEquals(Arrays.asList(0, 2, 0, 4, 0, 6), result);
+            assertEquals(Arrays.asList(1, 0, 2, 3, 0, 4, 5, 0, 6), arrived);
+            assertEquals(Arrays.asList(1, 3, 5), left);
+
+            arrived.removeAll(left);
+            assertEquals(arrived, result);
+        }
+    }
+
+    @Test
+    public void testRecoverWith() {
+        ArrayList<Object> result = new ArrayList<>();
+        ArrayList<Object> arrived = new ArrayList<>();
+        ArrayList<Object> left = new ArrayList<>();
+
+        OSGi<Integer> program = just(
+            Arrays.asList(1, 2, 3, 4, 5, 6)
+        ).recoverWith(
+            (__, e) -> just(0)
+        ).effects(
+            arrived::add, left::add
+        ).effects(
+            t -> {
+                if (t % 2 != 0) {
+                    throw new RuntimeException();
+                }
+            }
+            , __ -> {}
+        );
+
+        try (OSGiResult run = program.run(bundleContext, result::add)) {
+            assertEquals(Arrays.asList(0, 2, 0, 4, 0, 6), result);
+            assertEquals(Arrays.asList(1, 0, 2, 3, 0, 4, 5, 0, 6), arrived);
+            assertEquals(Arrays.asList(1, 3, 5), left);
+
+            arrived.removeAll(left);
+            assertEquals(arrived, result);
+        }
+    }
+
+    @Test
+    public void testOnCloseWithError() {
+        ArrayList<Object> result = new ArrayList<>();
+        ArrayList<Object> left = new ArrayList<>();
+
+        OSGi<Integer> program = just(
+            Arrays.asList(1, 2, 3, 4, 5, 6)
+        ).recoverWith(
+            (__, e) -> just(0)
+        ).flatMap(t ->
+            onClose(() -> left.add(t)).then(just(t))
+        ).
+        flatMap(t -> {
+            if (t % 2 != 0) {
+                throw new RuntimeException();
+            }
+
+            return just(t);
+        });
+
+        try (OSGiResult run = program.run(bundleContext, result::add)) {
+            assertEquals(Arrays.asList(0, 2, 0, 4, 0, 6), result);
+            assertEquals(Arrays.asList(1, 3, 5), left);
+        }
+    }
+
+    /*@Test
+    public void testRouteWithError() {
+        ArrayList<Object> result = new ArrayList<>();
+        ArrayList<Object> left = new ArrayList<>();
+
+        OSGi<Integer> program = just(
+            Arrays.asList(1, 2, 3, 4, 5, 6)
+        ).recoverWith(
+            (__, e) -> just(0)
+        ).route(router -> {
+            AtomicReference<SentEvent<Integer>> sentEvent =
+                new AtomicReference<>();
+
+            router.onIncoming(event -> {
+                sentEvent.set(router.signalAdd(event));
+            });
+            router.onLeaving(__ -> sentEvent.get().terminate());
+        }).
+            effects(__ -> {}, left::add).
+            flatMap(t -> {
+                if (t % 2 != 0) {
+                    throw new RuntimeException();
+                }
+
+                return just(t);
+            });
+
+        try (OSGiResult run = program.run(bundleContext, result::add)) {
+            assertEquals(Arrays.asList(0, 2, 0, 4, 0, 6), result);
+            assertEquals(Arrays.asList(1, 3, 5), left);
+        }
+    }*/
+
     private class Service {}
 
 }


Reply via email to