Author: struberg
Date: Sat Jun 17 09:06:35 2017
New Revision: 1798998
URL: http://svn.apache.org/viewvc?rev=1798998&view=rev
Log:
OWB-1188 improve test to show problems with async error handling
Modified:
openwebbeans/trunk/webbeans-impl/src/test/java/org/apache/webbeans/test/events/async/ObserversAsyncTest.java
Modified:
openwebbeans/trunk/webbeans-impl/src/test/java/org/apache/webbeans/test/events/async/ObserversAsyncTest.java
URL:
http://svn.apache.org/viewvc/openwebbeans/trunk/webbeans-impl/src/test/java/org/apache/webbeans/test/events/async/ObserversAsyncTest.java?rev=1798998&r1=1798997&r2=1798998&view=diff
==============================================================================
---
openwebbeans/trunk/webbeans-impl/src/test/java/org/apache/webbeans/test/events/async/ObserversAsyncTest.java
(original)
+++
openwebbeans/trunk/webbeans-impl/src/test/java/org/apache/webbeans/test/events/async/ObserversAsyncTest.java
Sat Jun 17 09:06:35 2017
@@ -24,14 +24,18 @@ import javax.enterprise.inject.spi.Exten
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import org.apache.webbeans.test.AbstractUnitTest;
+import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -39,33 +43,49 @@ import static org.junit.Assert.assertNot
public class ObserversAsyncTest extends AbstractUnitTest
{
+
+
@Test
- public void testAsyncEventExceptionHandling() throws ExecutionException,
InterruptedException
+ public void testAsyncEventExceptionHandling_handle() throws
ExecutionException, InterruptedException
{
- final int count = 100 + ForkJoinPool.getCommonPoolParallelism() * 10;
+ final int count = 10 + ForkJoinPool.getCommonPoolParallelism() * 5;
final VisitorCollectorEvent event = new VisitorCollectorEvent();
- addExtension(new Extension() {
- void addABunchOfObserversAtLeastMoreThanThreads(@Observes final
AfterBeanDiscovery afterBeanDiscovery)
+ addExtension(new ParallelObserveExtension(count));
+ startContainer();
+
+ BlockingQueue<Throwable> queue = new LinkedBlockingQueue<>();
+
+ long start = System.nanoTime();
+
+ getBeanManager().getEvent().fireAsync(event)
+ .handle((e, t) ->
{
- IntStream.range(0, count)
- .forEach(i -> afterBeanDiscovery.addObserverMethod()
- .observedType(VisitorCollectorEvent.class)
- .async(true)
- .notifyWith(e ->
- {
- if (i % 2 == 0 && (i < 30 || i > 70))
- {
- sleep(500);
- }
-
- final String name = "Observer" + i;
- event.visiting(name);
- throw new IllegalStateException(name);
- }));
- }
- });
+ return queue.offer(t);
+ });
+
+ Throwable t = queue.poll(20, TimeUnit.SECONDS);
+
+ long end = System.nanoTime();
+ long durationMs = TimeUnit.NANOSECONDS.toMillis(end - start);
+ System.out.println("took ms: " + durationMs);
+
+ Assert.assertNotNull(t);
+ Assert.assertTrue(t instanceof CompletionException);
+ CompletionException ce = (CompletionException) t;
+ Assert.assertEquals(count, ce.getSuppressed().length);
+
+ }
+
+ @Test
+ public void testAsyncEventExceptionHandling_CompletableFuture() throws
ExecutionException, InterruptedException
+ {
+ final int count = 10 + ForkJoinPool.getCommonPoolParallelism() * 5;
+
+ final VisitorCollectorEvent event = new VisitorCollectorEvent();
+
+ addExtension(new ParallelObserveExtension(count));
startContainer();
final AtomicReference<Throwable> observerException = new
AtomicReference<>();
@@ -118,4 +138,33 @@ public class ObserversAsyncTest extends
// ignore
}
}
+
+ private class ParallelObserveExtension implements Extension
+ {
+ private final int count;
+
+ public ParallelObserveExtension(int count)
+ {
+ this.count = count;
+ }
+
+ void addABunchOfObserversAtLeastMoreThanThreads(@Observes final
AfterBeanDiscovery afterBeanDiscovery)
+ {
+ IntStream.range(0, count)
+ .forEach(i ->
afterBeanDiscovery.<VisitorCollectorEvent>addObserverMethod()
+ .observedType(VisitorCollectorEvent.class)
+ .async(true)
+ .notifyWith(e ->
+ {
+ if (i % 2 == 0 && (i < 30 || i > 70))
+ {
+ sleep(500);
+ }
+
+ final String name = "Observer" + i;
+ e.getEvent().visiting(name);
+ throw new IllegalStateException(name);
+ }));
+ }
+ }
}