This is an automated email from the ASF dual-hosted git repository. jgallimore pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tomee.git
commit d4edb4ac0c2fbe6de833ab2a17f85a56d5d5af3d Author: Jonathan Gallimore <[email protected]> AuthorDate: Tue Dec 1 22:39:58 2020 +0000 WIP --- .../server/cxf/rs/CDISSEApplicationTest.java | 64 +++++++++++++++++++--- 1 file changed, 57 insertions(+), 7 deletions(-) diff --git a/server/openejb-cxf-rs/src/test/java/org/apache/openejb/server/cxf/rs/CDISSEApplicationTest.java b/server/openejb-cxf-rs/src/test/java/org/apache/openejb/server/cxf/rs/CDISSEApplicationTest.java index 9e350b5..38e688b 100644 --- a/server/openejb-cxf-rs/src/test/java/org/apache/openejb/server/cxf/rs/CDISSEApplicationTest.java +++ b/server/openejb-cxf-rs/src/test/java/org/apache/openejb/server/cxf/rs/CDISSEApplicationTest.java @@ -35,24 +35,28 @@ import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.Produces; +import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.Application; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; -import javax.ws.rs.sse.OutboundSseEvent; -import javax.ws.rs.sse.Sse; -import javax.ws.rs.sse.SseBroadcaster; -import javax.ws.rs.sse.SseEventSink; +import javax.ws.rs.sse.*; import java.io.Serializable; +import java.util.ArrayList; import java.util.Date; +import java.util.List; import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import static org.junit.Assert.assertEquals; @EnableServices("jax-rs") @RunWith(ApplicationComposer.class) -@Ignore public class CDISSEApplicationTest { private static int port = -1; @@ -77,8 +81,53 @@ public class CDISSEApplicationTest { } @Test - public void isCdi() { - assertEquals("[{\"id\":1}]", ClientBuilder.newClient().target("http://localhost:" + port).path("/foo/sse").request().get(String.class)); + public void testSse() throws Exception { + final List<Message> messages = new ArrayList<>(); + final Client client = ClientBuilder.newClient(); + final WebTarget target = client.target("http://localhost:" + port + "/foo/sse"); + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicBoolean complete = new AtomicBoolean(false); + + new Thread(() -> { + try (SseEventSource source = SseEventSource.target(target).build()) { + source.register((inboundSseEvent) -> { + final Message message = inboundSseEvent.readData(Message.class); + System.out.println("*** Received ***"); + messages.add(message); + latch.countDown(); + complete.set(true); + }); + source.open(); + } + + System.out.println("Is open"); + + while (! complete.get()) { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + } + } + + }).start(); + + new Thread(() -> { + while (! complete.get()) { + final WebTarget sendTarget = ClientBuilder.newClient().target("http://localhost:" + port + "/foo/sse"); + sendTarget.request().buildPost(Entity.entity("Test message", MediaType.TEXT_PLAIN_TYPE)).invoke(); + + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + } + + }).start(); + + latch.await(1, TimeUnit.MINUTES); + assertEquals(1, messages.size()); + assertEquals("Test message", messages.get(0).getText()); } public static class MyCdiRESTApplication extends Application { @@ -118,6 +167,7 @@ public class CDISSEApplicationTest { @Produces(MediaType.SERVER_SENT_EVENTS) public void stats(final @Context SseEventSink sink) { broadcaster.register(sink); + broadcaster.broadcast(createEvent(builder, eventId.incrementAndGet(), "Connected")); } private static OutboundSseEvent createEvent(final OutboundSseEvent.Builder builder, final long eventId, final String text) {
