Author: csierra
Date: Tue Oct 10 15:52:33 2017
New Revision: 1811732

URL: http://svn.apache.org/viewvc?rev=1811732&view=rev
Log:
[Component-DSL] Add concurrency tests

Disabled by default because they take some time.
The tests try to ensure that, when registering a counter effect, the
tuple always will execute it, either when the tuple is closed or in the
same moment if the tuple is already closed.

Same thing goes with OSGiResultImpl. If the result has been closed it
should never start. If, on the contrary, the program held by the result
had been started it should wait for it to finish starting before closing
it. If the program has not been started then the closing handlers will
not be executed.

Understading the close handlers as "counter effects" of the start
handlers, both Tuple and OSGiRsultImpl try to enforce the the net result
is always 0.

Of course the DSL can't make sure that the close handlers are the
"inverse" of the start handlers, or that the DSL users do not embed side
effects in allegedly pure operations.

Added:
    
aries/trunk/component-dsl/component-dsl/src/test/java/org/apache/aries/osgi/functional/internal/OSGiResultImplTest.java
    
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/AsynchronousTest.java
Modified:
    
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java
    
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java

Added: 
aries/trunk/component-dsl/component-dsl/src/test/java/org/apache/aries/osgi/functional/internal/OSGiResultImplTest.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/test/java/org/apache/aries/osgi/functional/internal/OSGiResultImplTest.java?rev=1811732&view=auto
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/test/java/org/apache/aries/osgi/functional/internal/OSGiResultImplTest.java
 (added)
+++ 
aries/trunk/component-dsl/component-dsl/src/test/java/org/apache/aries/osgi/functional/internal/OSGiResultImplTest.java
 Tue Oct 10 15:52:33 2017
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aries.osgi.functional.internal;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Carlos Sierra Andrés
+ */
+public class OSGiResultImplTest {
+
+       @Test
+       public void testStartClose() {
+               AtomicBoolean started = new AtomicBoolean();
+               AtomicBoolean closed = new AtomicBoolean();
+
+               OSGiResultImpl result = new OSGiResultImpl(set(started), 
set(closed));
+
+               result.start();
+               result.close();
+
+               assertEquals(started.get(), closed.get());
+
+               started = new AtomicBoolean();
+               closed = new AtomicBoolean();
+
+               result = new OSGiResultImpl(set(started), set(closed));
+
+               result.close();
+               result.start();
+
+               assertEquals(started.get(), closed.get());
+       }
+
+       @Test
+       public void testAsynchronousStartClose() throws InterruptedException {
+               ExecutorService executorService = 
Executors.newFixedThreadPool(2);
+
+               AtomicBoolean started = new AtomicBoolean();
+               AtomicBoolean closed = new AtomicBoolean();
+
+
+               OSGiResultImpl result = new OSGiResultImpl(
+                       () -> {
+                               try {
+                                       Thread.sleep(1000L);
+
+                                       started.set(true);
+                               }
+                               catch (InterruptedException e) {
+                                       e.printStackTrace();
+                               }
+                       },
+                       set(closed)
+               );
+
+               executorService.execute(result::start);
+               executorService.execute(result::close);
+
+               executorService.awaitTermination(2, TimeUnit.SECONDS);
+
+               assertEquals(started.get(), closed.get());
+       }
+
+       @Test
+       @Ignore
+       public void testAsynchronousManyStartClose() throws 
InterruptedException {
+               ExecutorService executorService = 
Executors.newFixedThreadPool(50);
+
+               AtomicBoolean[] starteds = Stream.
+                       generate(AtomicBoolean::new).
+                       limit(1000).
+                       toArray(AtomicBoolean[]::new);
+
+               AtomicBoolean[] closeds = Stream.
+                       generate(AtomicBoolean::new).
+                       limit(1000).
+                       toArray(AtomicBoolean[]::new);
+
+               Random random = new Random(System.currentTimeMillis());
+
+               for (int i = 0; i < starteds.length; i++) {
+                       AtomicBoolean started = starteds[i];
+                       AtomicBoolean closed = closeds[i];
+
+                       OSGiResultImpl result = new OSGiResultImpl(
+                               () -> {
+                                       ignoreException(() -> 
Thread.sleep(random.nextInt(10)));
+
+                                       started.set(true);
+                               },
+                               set(closed)
+                       );
+
+                       executorService.execute(
+                               () -> {
+                                       ignoreException(() -> 
Thread.sleep(random.nextInt(2)));
+                                       result.start();
+                               });
+                       executorService.execute(
+                               () -> {
+                                       ignoreException(() -> 
Thread.sleep(random.nextInt(2)));
+                                       result.close();
+                               });
+               }
+
+               executorService.shutdown();
+
+               executorService.awaitTermination(100000, TimeUnit.MILLISECONDS);
+
+               long count = 
Arrays.stream(starteds).filter(AtomicBoolean::get).count();
+
+               assertTrue(count > 0);
+
+               for (int i = 0; i < closeds.length; i++) {
+                       assertEquals(starteds[i].get(), closeds[i].get());
+               }
+       }
+
+       private interface ExceptionalRunnable {
+               void run() throws Exception;
+       }
+
+       private static void ignoreException(ExceptionalRunnable callable) {
+               try {
+                       callable.run();
+               }
+               catch (Exception e) {
+                       throw new RuntimeException(e);
+               }
+       }
+
+       public static Runnable set(AtomicBoolean atomicBoolean) {
+               return () -> atomicBoolean.set(true);
+       }
+
+}

Modified: 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java?rev=1811732&r1=1811731&r2=1811732&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java
 (original)
+++ 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java
 Tue Oct 10 15:52:33 2017
@@ -35,11 +35,11 @@ public class ProbeImpl<T> extends OSGiIm
 
     public Function<T, SentEvent<T>> getOperation() {
         return (t) -> {
-            Tuple<T> tuple = Tuple.create(t);
+            ProbeOperationImpl<T> operation = (ProbeOperationImpl<T>) 
_operation;
 
-            ((ProbeOperationImpl<T>)_operation)._op.accept(tuple);
+            Tuple<T> tuple = Tuple.create(t);
 
-            return new SentEvent<T>() {
+            SentEvent<T> sentEvent = new SentEvent<T>() {
                 @Override
                 public Event<T> getEvent() {
                     return tuple;
@@ -50,6 +50,12 @@ public class ProbeImpl<T> extends OSGiIm
                     tuple.terminate();
                 }
             };
+
+            if (!operation.closed) {
+                operation._op.accept(tuple);
+            }
+
+            return sentEvent;
         };
     }
 
@@ -57,6 +63,7 @@ public class ProbeImpl<T> extends OSGiIm
 
         BundleContext _bundleContext;
         Consumer<Tuple<T>> _op;
+        volatile boolean closed;
 
         @Override
         public OSGiResultImpl run(
@@ -64,7 +71,7 @@ public class ProbeImpl<T> extends OSGiIm
             _bundleContext = bundleContext;
             _op = op;
 
-            return new OSGiResultImpl(NOOP, NOOP);
+            return new OSGiResultImpl(NOOP, () -> closed = true);
         }
     }
 

Modified: 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java?rev=1811732&r1=1811731&r2=1811732&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java
 (original)
+++ 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java
 Tue Oct 10 15:52:33 2017
@@ -24,7 +24,9 @@ import org.junit.Test;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.FrameworkUtil;
 
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 
 import static org.apache.aries.osgi.functional.OSGi.just;
@@ -40,6 +42,62 @@ public class ProbeTests {
         FrameworkUtil.getBundle(DSLTest.class).getBundleContext();
 
     @Test
+    public void testTupleTermination() {
+        AtomicReference<String> result = new AtomicReference<>("");
+
+        ProbeImpl<String> probeA = new ProbeImpl<>();
+        AtomicReference<ProbeImpl<String>> probeBreference = new 
AtomicReference<>();
+
+        OSGiImpl<String> program =
+            probeA.flatMap(a ->
+            onClose(
+                () -> result.accumulateAndGet("Hello", (x, y) -> x.replace(y, 
""))).
+            flatMap(__ -> {
+                ProbeImpl<String> probeB = new ProbeImpl<>();
+
+                probeBreference.set(probeB);
+
+                return probeB.flatMap(b ->
+                    onClose(
+                        () -> result.accumulateAndGet(", World", (x, y) -> 
x.replace(y, ""))).
+                        then(
+                            just(a + b)));
+            }
+
+        ));
+
+        program.run(bundleContext, result::set);
+
+        Function<String, SentEvent<String>> opA = probeA.getOperation();
+
+        SentEvent<String> sentA = opA.apply("Hello");
+
+        Function<String, SentEvent<String>> opB = 
probeBreference.get().getOperation();
+
+        sentA.terminate();
+
+        SentEvent<String> sentB = opB.apply(", World");
+        sentB.terminate();
+
+        assertEquals("", result.get());
+
+        program.run(bundleContext, result::set);
+
+        opA = probeA.getOperation();
+        sentA = opA.apply("Hello");
+
+        opB = probeBreference.get().getOperation();
+        sentB = opB.apply(", World");
+
+        assertEquals("Hello, World", result.get());
+
+        sentA.terminate();
+        sentB.terminate();
+
+        assertEquals("", result.get());
+    }
+
+    @Test
     public void testProbe() {
         AtomicInteger result = new AtomicInteger();
 

Added: 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/AsynchronousTest.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/AsynchronousTest.java?rev=1811732&view=auto
==============================================================================
--- 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/AsynchronousTest.java
 (added)
+++ 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/AsynchronousTest.java
 Tue Oct 10 15:52:33 2017
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aries.osgi.functional.test;
+
+import org.apache.aries.osgi.functional.OSGi;
+import org.apache.aries.osgi.functional.OSGiResult;
+import org.apache.aries.osgi.functional.SentEvent;
+import org.apache.aries.osgi.functional.internal.ProbeImpl;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.cm.Configuration;
+import org.osgi.service.cm.ConfigurationAdmin;
+
+import java.util.Hashtable;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+import static org.apache.aries.osgi.functional.OSGi.apply;
+import static org.apache.aries.osgi.functional.OSGi.configurations;
+import static org.apache.aries.osgi.functional.OSGi.services;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Carlos Sierra Andrés
+ */
+@Ignore
+public class AsynchronousTest {
+
+    static BundleContext bundleContext = FrameworkUtil.getBundle(
+        AsynchronousTest.class).getBundleContext();
+
+    @Test
+    public void testApplicative() throws InterruptedException {
+        int RUNS = 40;
+        AtomicBoolean[][][] started = new AtomicBoolean[RUNS][RUNS][RUNS];
+        AtomicBoolean[][][] closed = new AtomicBoolean[RUNS][RUNS][RUNS];
+
+        for (int i = 0; i < RUNS; i++) {
+            for (int j = 0; j < RUNS; j++) {
+                for (int k = 0; k < RUNS; k++) {
+                    started[i][j][k] = new AtomicBoolean();
+                    closed[i][j][k] = new AtomicBoolean();
+                }
+            }
+        }
+
+        OSGi<Integer> as = services(Service.class, 
"(property=a)").map(Service::getI);
+        OSGi<Integer> bs = services(Service.class, 
"(property=b)").map(Service::getI);
+        OSGi<Integer> cs = services(Service.class, 
"(property=c)").map(Service::getI);
+
+        OSGi<int[]> combined = apply((x, y, z) -> new int[] {x, y, z}, as, bs, 
cs);
+
+        OSGi<?> program = combined.effects(
+            i -> started[i[0]][i[1]][i[2]].set(true),
+            i -> closed[i[0]][i[1]][i[2]].set(true));
+
+        OSGiResult result = program.run(bundleContext);
+
+        result.start();
+
+        ExecutorService executor = Executors.newFixedThreadPool(RUNS);
+
+        Random random = new Random(System.currentTimeMillis());
+
+        for (int i = 0; i < RUNS; i++) {
+            final int ii = i;
+            for (int j = 0; j < RUNS; j++) {
+                final int jj = j;
+                for (int k = 0; k < RUNS; k++) {
+                    final int kk = k;
+                    executor.execute(() -> {
+                        ignoreException(() -> 
Thread.sleep(random.nextInt(10)));
+
+                        ServiceRegistration<Service> sr =
+                            bundleContext.registerService(
+                                Service.class,
+                                new Service(ii),
+                                new Hashtable<String, Object>() {{
+                                    put("property", "a");
+                                }});
+
+                        ignoreException(() -> Thread.sleep(random.nextInt(2)));
+
+                        sr.unregister();
+                    });
+                    executor.execute(() -> {
+                        ignoreException(() -> Thread.sleep(random.nextInt(5)));
+
+                        ServiceRegistration<Service> sr =
+                            bundleContext.registerService(
+                                Service.class,
+                                new Service(jj),
+                                new Hashtable<String, Object>() {{
+                                    put("property", "b");
+                                }});
+
+                        ignoreException(() -> Thread.sleep(random.nextInt(2)));
+
+                        sr.unregister();
+                    });
+                    executor.execute(() -> {
+                        ignoreException(() -> Thread.sleep(random.nextInt(2)));
+
+                        ServiceRegistration<Service> sr =
+                            bundleContext.registerService(
+                                Service.class,
+                                new Service(kk),
+                                new Hashtable<String, Object>() {{
+                                    put("property", "c");
+                                }});
+
+                        ignoreException(() -> Thread.sleep(random.nextInt(2)));
+
+                        sr.unregister();
+                    });
+                }
+            }
+        }
+
+        executor.shutdown();
+
+        boolean finished = executor.awaitTermination(1, TimeUnit.MINUTES);
+
+        System.out.println("******** FINISHED: " + finished);
+
+        int executedCount = 0;
+        int totalCount = 0;
+        int errors = 0;
+
+        for (int i = 0; i < RUNS; i++) {
+            for (int j = 0; j < RUNS; j++) {
+                for (int k = 0; k < RUNS; k++) {
+                    if (started[i][j][k].get()) {
+                        executedCount ++;
+                    }
+
+                    if (!(started[i][j][k].get() == closed[i][j][k].get())) {
+                        errors ++;
+                    }
+
+                    totalCount ++;
+
+                }
+            }
+        }
+
+        System.out.println("******* TOTAL: " + totalCount);
+        System.out.println("******* EXECUTED: " + executedCount);
+        System.out.println("******* ERRORS: " + errors);
+
+        assertTrue(executedCount < totalCount);
+        assertEquals(0, errors);
+    }
+
+    @Test
+    public void testApplicativeConfiguration() throws InterruptedException {
+        int RUNS = 40;
+        AtomicBoolean[][][] started = new AtomicBoolean[RUNS][RUNS][RUNS];
+        AtomicBoolean[][][] closed = new AtomicBoolean[RUNS][RUNS][RUNS];
+
+        for (int i = 0; i < RUNS; i++) {
+            for (int j = 0; j < RUNS; j++) {
+                for (int k = 0; k < RUNS; k++) {
+                    started[i][j][k] = new AtomicBoolean();
+                    closed[i][j][k] = new AtomicBoolean();
+                }
+            }
+        }
+
+        OSGi<Integer> as = services(Service.class, 
"(property=a)").map(Service::getI);
+        OSGi<Integer> bs = services(Service.class, 
"(property=b)").map(Service::getI);
+        OSGi<Integer> cs = configurations("configurationc").map(d -> 
(Integer)d.get("property"));
+
+        OSGi<int[]> combined = apply((x, y, z) -> new int[] {x, y, z}, as, bs, 
cs);
+
+        OSGi<?> program = combined.effects(
+            i -> started[i[0]][i[1]][i[2]].set(true),
+            i -> closed[i[0]][i[1]][i[2]].set(true));
+
+        OSGiResult result = program.run(bundleContext);
+
+        result.start();
+
+        ServiceReference<ConfigurationAdmin> configAdmin =
+            bundleContext.getServiceReference(ConfigurationAdmin.class);
+
+        ConfigurationAdmin configurationAdmin = bundleContext.getService(
+            configAdmin);
+
+        ExecutorService executor = Executors.newFixedThreadPool(RUNS);
+        ExecutorService executor2 = Executors.newFixedThreadPool(1);
+
+        Random random = new Random(System.currentTimeMillis());
+
+        for (int i = 0; i < RUNS; i++) {
+            final int ii = i;
+            for (int j = 0; j < RUNS; j++) {
+                final int jj = j;
+                for (int k = 0; k < RUNS; k++) {
+                    final int kk = k;
+                    executor.execute(() -> {
+                        ignoreException(() -> 
Thread.sleep(random.nextInt(10)));
+
+                        ServiceRegistration<Service> sr =
+                            bundleContext.registerService(
+                                Service.class,
+                                new Service(ii),
+                                new Hashtable<String, Object>() {{
+                                    put("property", "a");
+                                }});
+
+                        ignoreException(() -> Thread.sleep(random.nextInt(2)));
+
+                        sr.unregister();
+                    });
+                    executor.execute(() -> {
+                        ignoreException(() -> Thread.sleep(random.nextInt(5)));
+
+                        ServiceRegistration<Service> sr =
+                            bundleContext.registerService(
+                                Service.class,
+                                new Service(jj),
+                                new Hashtable<String, Object>() {{
+                                    put("property", "b");
+                                }});
+
+                        ignoreException(() -> Thread.sleep(random.nextInt(2)));
+
+                        sr.unregister();
+                    });
+                    executor2.execute(() ->
+                        ignoreException(() -> {
+                            Thread.sleep(random.nextInt(2));
+
+                            Configuration configurationc =
+                                configurationAdmin.createFactoryConfiguration(
+                                    "configurationc");
+
+                            configurationc.update(
+                                new Hashtable<String, Object>() {{
+                                    put("property", kk);
+                                }}
+                            );
+
+                            configurationc.delete();
+                        }));
+                }
+            }
+        }
+
+        executor.shutdown();
+
+        boolean finished = executor.awaitTermination(1, TimeUnit.MINUTES);
+
+        System.out.println("******** FINISHED: " + finished);
+
+        int executedCount = 0;
+        int totalCount = 0;
+        int errors = 0;
+
+        for (int i = 0; i < RUNS; i++) {
+            for (int j = 0; j < RUNS; j++) {
+                for (int k = 0; k < RUNS; k++) {
+                    if (started[i][j][k].get()) {
+                        executedCount ++;
+                    }
+
+                    if (!(started[i][j][k].get() == closed[i][j][k].get())) {
+                        errors ++;
+                    }
+
+                    totalCount ++;
+
+                }
+            }
+        }
+
+        System.out.println("******* TOTAL: " + totalCount);
+        System.out.println("******* EXECUTED: " + executedCount);
+        System.out.println("******* ERRORS: " + errors);
+
+        assertTrue(executedCount < totalCount);
+        assertEquals(0, errors);
+    }
+
+
+    @Test
+    public void testApplicativeProbe() throws InterruptedException {
+        int RUNS = 40;
+        AtomicBoolean[][][] started = new AtomicBoolean[RUNS][RUNS][RUNS];
+        AtomicBoolean[][][] closed = new AtomicBoolean[RUNS][RUNS][RUNS];
+
+        for (int i = 0; i < RUNS; i++) {
+            for (int j = 0; j < RUNS; j++) {
+                for (int k = 0; k < RUNS; k++) {
+                    started[i][j][k] = new AtomicBoolean();
+                    closed[i][j][k] = new AtomicBoolean();
+                }
+            }
+        }
+
+        OSGi<Integer> as = new ProbeImpl<>();
+        OSGi<Integer> bs = new ProbeImpl<>();
+        OSGi<Integer> cs = new ProbeImpl<>();
+
+        OSGi<int[]> combined = apply((x, y, z) -> new int[] {x, y, z}, as, bs, 
cs);
+
+        OSGi<?> program = combined.effects(
+            i -> started[i[0]][i[1]][i[2]].set(true),
+            i -> closed[i[0]][i[1]][i[2]].set(true));
+
+        OSGiResult result = program.run(bundleContext);
+
+        result.start();
+
+        Function<Integer, SentEvent<Integer>> opa = ((ProbeImpl<Integer>) 
as).getOperation();
+        Function<Integer, SentEvent<Integer>> opb = ((ProbeImpl<Integer>) 
bs).getOperation();
+        Function<Integer, SentEvent<Integer>> opc = ((ProbeImpl<Integer>) 
cs).getOperation();
+
+        ExecutorService executor = Executors.newFixedThreadPool(8);
+
+        Random random = new Random(System.currentTimeMillis());
+
+        for (int i = 0; i < RUNS; i++) {
+            final int ii = i;
+            for (int j = 0; j < RUNS; j++) {
+                final int jj = j;
+                for (int k = 0; k < RUNS; k++) {
+                    final int kk = k;
+                    executor.execute(() -> {
+                        ignoreException(() -> 
Thread.sleep(random.nextInt(10)));
+
+                        SentEvent<Integer> sentEvent = opa.apply(ii);
+
+                        ignoreException(() -> Thread.sleep(random.nextInt(2)));
+
+                        sentEvent.terminate();
+                    });
+                    executor.execute(() -> {
+                        ignoreException(() -> Thread.sleep(random.nextInt(5)));
+
+                        SentEvent<Integer> sentEvent = opb.apply(jj);
+
+                        ignoreException(() -> Thread.sleep(random.nextInt(2)));
+
+                        sentEvent.terminate();
+                    });
+                    executor.execute(() -> {
+                        ignoreException(() -> Thread.sleep(random.nextInt(2)));
+
+                        SentEvent<Integer> sentEvent = opc.apply(kk);
+
+                        ignoreException(() -> Thread.sleep(random.nextInt(2)));
+
+                        sentEvent.terminate();
+                    });
+                }
+            }
+        }
+
+        executor.shutdown();
+
+        boolean finished = executor.awaitTermination(2, TimeUnit.MINUTES);
+
+        System.out.println("******** FINISHED: " + finished);
+
+        int executedCount = 0;
+        int totalCount = 0;
+        int errors = 0;
+
+        for (int i = 0; i < RUNS; i++) {
+            for (int j = 0; j < RUNS; j++) {
+                for (int k = 0; k < RUNS; k++) {
+                    if (started[i][j][k].get()) {
+                        executedCount ++;
+                    }
+
+                    if (!(started[i][j][k].get() == closed[i][j][k].get())) {
+                        errors ++;
+                    }
+
+                    totalCount ++;
+
+                }
+            }
+        }
+
+        System.out.println("******* TOTAL: " + totalCount);
+        System.out.println("******* EXECUTED: " + executedCount);
+        System.out.println("******* ERRORS: " + errors);
+
+        assertTrue(executedCount < totalCount);
+        assertEquals(0, errors);
+
+    }
+
+
+    private interface ExceptionalRunnable {
+        void run() throws Exception;
+    }
+
+    private static void ignoreException(ExceptionalRunnable callable) {
+        try {
+            callable.run();
+        }
+        catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private class Service {
+        public Service(int i) {
+            this.i = i;
+        }
+
+        int i;
+
+        public int getI() {
+            return i;
+        }
+    }
+}
+


Reply via email to