Author: csierra
Date: Thu May 17 12:56:27 2018
New Revision: 1831776

URL: http://svn.apache.org/viewvc?rev=1831776&view=rev
Log:
[Component-DSL] Add coalesce function

This function always keeps active the highest precedence program that
has produced a value.

Added:
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/CoalesceOSGiImpl.java
Modified:
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
    
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/component/dsl/test/DSLTest.java

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java?rev=1831776&r1=1831775&r2=1831776&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
 Thu May 17 12:56:27 2018
@@ -29,6 +29,7 @@ import org.apache.aries.component.dsl.fu
 import org.apache.aries.component.dsl.function.Function6;
 import org.apache.aries.component.dsl.function.Function8;
 import org.apache.aries.component.dsl.function.Function9;
+import org.apache.aries.component.dsl.internal.CoalesceOSGiImpl;
 import org.apache.aries.component.dsl.internal.ConfigurationOSGiImpl;
 import org.apache.aries.component.dsl.internal.EffectsOSGi;
 import org.apache.aries.component.dsl.internal.NothingOSGiImpl;
@@ -106,6 +107,11 @@ public interface OSGi<T> extends OSGiRun
                return new ChangeContextOSGiImpl<>(program, bundleContext);
        }
 
+       @SafeVarargs
+       static <T> OSGi<T> coalesce(OSGi<T> ... programs) {
+               return new CoalesceOSGiImpl<>(programs);
+       }
+
        static <A, B, RES> OSGi<RES> combine(Function2<A, B, RES> fun, OSGi<A> 
a, OSGi<B> b) {
                return b.applyTo(a.applyTo(just(fun.curried())));
        }

Added: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/CoalesceOSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/CoalesceOSGiImpl.java?rev=1831776&view=auto
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/CoalesceOSGiImpl.java
 (added)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/CoalesceOSGiImpl.java
 Thu May 17 12:56:27 2018
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aries.component.dsl.internal;
+
+import org.apache.aries.component.dsl.OSGi;
+import org.apache.aries.component.dsl.OSGiResult;
+import org.apache.aries.component.dsl.Publisher;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author Carlos Sierra Andrés
+ */
+public class CoalesceOSGiImpl<T> extends OSGiImpl<T> {
+
+    @SafeVarargs
+    public CoalesceOSGiImpl(OSGi<T>... programs) {
+        super((bundleContext, op) -> {
+            AtomicBoolean initialized = new AtomicBoolean();
+            AtomicInteger[] atomicIntegers = new 
AtomicInteger[programs.length];
+            OSGiResult[] results = new OSGiResult[programs.length];
+            AtomicInteger index = new AtomicInteger();
+            Publisher<T>[] publishers = new Publisher[programs.length];
+
+            for (int i = 0; i < atomicIntegers.length; i++) {
+                atomicIntegers[i] = new AtomicInteger();
+            }
+
+            for (int i = 0; i < atomicIntegers.length; i++) {
+                AtomicInteger atomicInteger = atomicIntegers[i];
+
+                final int pos = i;
+
+                publishers[i] = t -> {
+                    OSGiResult result;
+
+                    synchronized (initialized) {
+                        atomicInteger.incrementAndGet();
+
+                        if (initialized.get()) {
+                            int indexInt = index.getAndSet(pos);
+
+                            if (pos < indexInt) {
+                                for (int j = pos + 1; j <= indexInt; j++) {
+                                    results[j].close();
+                                }
+
+                            }
+                        }
+
+                        result = op.publish(t);
+                    }
+
+                    return () -> {
+                        synchronized (initialized) {
+                            result.close();
+
+                            int current = atomicInteger.decrementAndGet();
+
+                            if (!initialized.get()) {
+                                return;
+                            }
+
+                            if (pos <= index.get() && current == 0) {
+                                for (int j = pos + 1; j < results.length; j++) 
{
+                                    results[j] = programs[j].run(
+                                        bundleContext, publishers[j]);
+
+                                    index.set(j);
+
+                                    if (atomicIntegers[j].get() > 0) {
+                                        break;
+                                    }
+                                }
+                            }
+                        }
+                    };
+                };
+            }
+
+            synchronized (initialized) {
+                for (int i = 0; i < publishers.length; i++) {
+
+                    results[i] = programs[i].run(bundleContext, publishers[i]);
+
+                    index.set(i);
+
+                    if (atomicIntegers[i].get() > 0) {
+                        initialized.set(true);
+
+                        break;
+                    }
+                }
+
+                initialized.set(true);
+            }
+
+            return new OSGiResultImpl(
+                () -> {
+                    synchronized (initialized) {
+                        initialized.set(false);
+
+                        for (int i = 0; i <= index.get(); i++) {
+                            results[i].close();
+                        }
+                    }
+                }
+            );
+        });
+    }
+}

Modified: 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/component/dsl/test/DSLTest.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/component/dsl/test/DSLTest.java?rev=1831776&r1=1831775&r2=1831776&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/component/dsl/test/DSLTest.java
 (original)
+++ 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/component/dsl/test/DSLTest.java
 Thu May 17 12:56:27 2018
@@ -52,6 +52,7 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.aries.component.dsl.OSGi.NOOP;
+import static org.apache.aries.component.dsl.OSGi.coalesce;
 import static org.apache.aries.component.dsl.OSGi.configuration;
 import static org.apache.aries.component.dsl.OSGi.configurations;
 import static org.apache.aries.component.dsl.OSGi.just;
@@ -75,6 +76,116 @@ public class DSLTest {
         DSLTest.class).getBundleContext();
 
     @Test
+    public void testCoalesce() {
+        ProbeImpl<String> program1 = new ProbeImpl<>();
+        ProbeImpl<String> program2 = new ProbeImpl<>();
+
+        ArrayList<String> effects = new ArrayList<>();
+
+        OSGiResult result = coalesce(
+            program1, program2, just(Arrays.asList("fixed1", "fixed2")),
+            just("never")).
+            effects(effects::add, effects::add).
+            run(bundleContext);
+
+        Publisher<? super String> publisher1 = program1.getPublisher();
+        Publisher<? super String> publisher2 = program2.getPublisher();
+
+        assertEquals(Arrays.asList("fixed1", "fixed2"), effects);
+
+        OSGiResult event1Result = publisher2.publish("event1");
+
+        program2.onClose(event1Result);
+
+        assertEquals(
+            Arrays.asList(
+                "fixed1", "fixed2", "fixed2", "fixed1", "event1"), effects);
+
+        OSGiResult event2Result = publisher1.publish("event2");
+
+        assertEquals(
+            Arrays.asList(
+                "fixed1", "fixed2", "fixed2", "fixed1", "event1", "event1", 
"event2"), effects);
+
+        event2Result.close();
+
+        assertEquals(
+            Arrays.asList(
+                "fixed1", "fixed2", "fixed2", "fixed1", "event1", "event1", 
"event2", "event2", "fixed1", "fixed2"), effects);
+
+        event2Result = publisher1.publish("event3");
+
+        program1.onClose(event2Result);
+
+        assertEquals(
+            Arrays.asList(
+                "fixed1", "fixed2", "fixed2", "fixed1", "event1", "event1",
+                "event2", "event2", "fixed1", "fixed2", "fixed2", "fixed1",
+                "event3"),
+            effects);
+
+        result.close();
+
+        assertEquals(
+            Arrays.asList(
+                "fixed1", "fixed2", "fixed2", "fixed1", "event1", "event1",
+                "event2", "event2", "fixed1", "fixed2", "fixed2", "fixed1",
+                "event3", "event3"),
+            effects);
+    }
+
+    @Test
+    public void testCoalesceWhenEmpty() {
+        ProbeImpl<String> program1 = new ProbeImpl<>();
+        ProbeImpl<String> program2 = new ProbeImpl<>();
+
+        ArrayList<String> effects = new ArrayList<>();
+
+        OSGiResult result = coalesce(program1, program2).
+            effects(effects::add, effects::add).
+            run(bundleContext);
+
+        Publisher<? super String> publisher1 = program1.getPublisher();
+        Publisher<? super String> publisher2 = program2.getPublisher();
+
+        assertEquals(Collections.emptyList(), effects);
+
+        OSGiResult event1Result = publisher2.publish("event1");
+
+        program2.onClose(event1Result);
+
+        assertEquals(
+            Arrays.asList("event1"), effects);
+
+        OSGiResult event2Result = publisher1.publish("event2");
+
+        assertEquals(
+            Arrays.asList("event1", "event1", "event2"), effects);
+
+        event2Result.close();
+
+        assertEquals(
+            Arrays.asList("event1", "event1", "event2", "event2"), effects);
+
+        event2Result = publisher1.publish("event3");
+
+        program1.onClose(event2Result);
+
+        assertEquals(
+            Arrays.asList(
+                "event1", "event1", "event2", "event2", "event3"),
+            effects);
+
+        result.close();
+
+        assertEquals(
+            Arrays.asList(
+                "event1", "event1", "event2", "event2", "event3", "event3"),
+            effects);
+    }
+
+
+    @Test
     public void testJust() {
         AtomicInteger atomicInteger = new AtomicInteger(0);
 


Reply via email to