This is an automated email from the ASF dual-hosted git repository. reta pushed a commit to branch 3.2.x-fixes in repository https://gitbox.apache.org/repos/asf/cxf.git
commit 95a200a6467830eca7432bf9312c514206a4196f Author: reta <[email protected]> AuthorDate: Sun Mar 22 12:58:18 2020 -0400 CXF-8249: SSE client refuses to accept valid stream (cherry picked from commit 16cbb21aa3b2fb1cb0ad5c049ebe4a9771b11b7c) (cherry picked from commit 1bd2d4519fa86981d7277ca450b5090ce4c2af55) # Conflicts: # systests/spring-boot/pom.xml # systests/spring-boot/src/test/java/org/apache/cxf/systest/jaxrs/resources/Book.java --- parent/pom.xml | 5 + .../jaxrs/sse/client/InboundSseEventProcessor.java | 50 ++++++-- systests/pom.xml | 1 + .../cxf/systest/jaxrs/sse/AbstractSseTest.java | 28 ++++- .../apache/cxf/systest/jaxrs/sse/BookStore.java | 39 ++++-- .../apache/cxf/systest/jaxrs/sse/BookStore2.java | 39 ++++-- .../jaxrs/sse/BookStoreClientCloseable.java | 18 ++- systests/spring-boot/pom.xml | 140 +++++++++++++++++++++ .../apache/cxf/systest/jaxrs/resources/Book.java | 68 ++++++++++ .../jaxrs/spring/boot/SpringSseEmitterTest.java | 132 +++++++++++++++++++ 10 files changed, 482 insertions(+), 38 deletions(-) diff --git a/parent/pom.xml b/parent/pom.xml index 074ce8a..917fa54 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -817,6 +817,11 @@ </dependency> <dependency> <groupId>org.apache.cxf</groupId> + <artifactId>cxf-rt-rs-sse</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.cxf</groupId> <artifactId>cxf-rt-rs-extension-providers</artifactId> <version>${project.version}</version> </dependency> diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java index 2412140..016282c 100644 --- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java @@ -47,11 +47,11 @@ public class InboundSseEventProcessor { public static final MediaType SERVER_SENT_EVENTS_TYPE = MediaType.valueOf(SERVER_SENT_EVENTS); private static final Logger LOG = LogUtils.getL7dLogger(InboundSseEventProcessor.class); - private static final String COMMENT = ": "; - private static final String EVENT = "event: "; - private static final String ID = "id: "; - private static final String RETRY = "retry: "; - private static final String DATA = "data: "; + private static final String COMMENT = ":"; + private static final String EVENT = "event:"; + private static final String ID = "id:"; + private static final String RETRY = "retry:"; + private static final String DATA = "data:"; private final Endpoint endpoint; private final InboundSseEventListener listener; @@ -93,16 +93,23 @@ public class InboundSseEventProcessor { builder = null; /* reset the builder for next event */ listener.onNext(event); } else { + // Parsing and interpreting event stream: + // https://www.w3.org/TR/eventsource/#parsing-an-event-stream if (line.startsWith(EVENT)) { - builder = getOrCreate(builder).name(line.substring(EVENT.length())); + int beginIndex = findFirstNonSpacePosition(line, EVENT); + builder = getOrCreate(builder).name(line.substring(beginIndex)); } else if (line.startsWith(ID)) { - builder = getOrCreate(builder).id(line.substring(ID.length())); + int beginIndex = findFirstNonSpacePosition(line, ID); + builder = getOrCreate(builder).id(line.substring(beginIndex)); } else if (line.startsWith(COMMENT)) { - builder = getOrCreate(builder).comment(line.substring(COMMENT.length())); + int beginIndex = findFirstNonSpacePosition(line, COMMENT); + builder = getOrCreate(builder).comment(line.substring(beginIndex)); } else if (line.startsWith(RETRY)) { - builder = getOrCreate(builder).reconnectDelay(line.substring(RETRY.length())); + int beginIndex = findFirstNonSpacePosition(line, RETRY); + builder = getOrCreate(builder).reconnectDelay(line.substring(beginIndex)); } else if (line.startsWith(DATA)) { - builder = getOrCreate(builder).appendData(line.substring(DATA.length())); + int beginIndex = findFirstNonSpacePosition(line, DATA); + builder = getOrCreate(builder).appendData(line.substring(beginIndex)); } } line = reader.readLine(); @@ -156,4 +163,27 @@ public class InboundSseEventProcessor { private static Builder getOrCreate(final Builder builder) { return (builder == null) ? new InboundSseEventImpl.Builder() : builder; } + + /** + * Remove only leading spaces from the line as per specification, space after + * the colon is optional. + * + * The following stream fires two identical events: + * + * data:test + * data: test + * + * This is because the space after the colon is ignored if present. + */ + private static int findFirstNonSpacePosition(final String str, final String prefix) { + int beginIndex = prefix.length(); + + for (; beginIndex < str.length(); ++beginIndex) { + if (str.charAt(beginIndex) != ' ') { + break; + } + } + + return beginIndex; + } } diff --git a/systests/pom.xml b/systests/pom.xml index ac5e163..9952c00 100644 --- a/systests/pom.xml +++ b/systests/pom.xml @@ -54,5 +54,6 @@ <module>ws-transfer</module> <module>rs-sse</module> <module>microprofile</module> + <module>spring-boot</module> </modules> </project> diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java index 0e1158a..87c130a 100644 --- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java +++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java @@ -101,6 +101,28 @@ public abstract class AbstractSseTest extends AbstractSseBaseTest { } @Test + public void testBookTitlesStreamIsReturnedFromInboundSseEvents() throws InterruptedException { + final WebTarget target = createWebTarget("/rest/api/bookstore/titles/sse"); + final Collection<String> titles = new ArrayList<>(); + + try (SseEventSource eventSource = SseEventSource.target(target).build()) { + eventSource.register(collectRaw(titles), System.out::println); + eventSource.open(); + // Give the SSE stream some time to collect all events + awaitEvents(5000, titles, 4); + } + // Easing the test verification here, it does not work well for Atm + Jetty + assertThat(titles, + hasItems( + "New Book #1", + "New Book #2", + "New Book #3", + "New Book #4" + ) + ); + } + + @Test public void testNoDataIsReturnedFromInboundSseEvents() throws InterruptedException { final WebTarget target = createWebTarget("/rest/api/bookstore/nodata"); final Collection<Book> books = new ArrayList<>(); @@ -310,7 +332,11 @@ public abstract class AbstractSseTest extends AbstractSseBaseTest { return false; } - private static Consumer<InboundSseEvent> collect(final Collection< Book > books) { + private static Consumer<InboundSseEvent> collect(final Collection<Book> books) { return event -> books.add(event.readData(Book.class, MediaType.APPLICATION_JSON_TYPE)); } + + private static Consumer<InboundSseEvent> collectRaw(final Collection<String> titles) { + return event -> titles.add(event.readData(String.class, MediaType.TEXT_PLAIN_TYPE)); + } } diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java index 41ffd34..de28eef 100644 --- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java +++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java @@ -78,13 +78,13 @@ public class BookStore extends BookStoreClientCloseable { final Integer id = Integer.valueOf(lastEventId); final Builder builder = sse.newEventBuilder(); - sink.send(createStatsEvent(builder.name("book"), id + 1)); + sink.send(createEvent(builder.name("book"), id + 1)); Thread.sleep(200); - sink.send(createStatsEvent(builder.name("book"), id + 2)); + sink.send(createEvent(builder.name("book"), id + 2)); Thread.sleep(200); - sink.send(createStatsEvent(builder.name("book"), id + 3)); + sink.send(createEvent(builder.name("book"), id + 3)); Thread.sleep(200); - sink.send(createStatsEvent(builder.name("book"), id + 4)); + sink.send(createEvent(builder.name("book"), id + 4)); Thread.sleep(200); sink.close(); } catch (final InterruptedException ex) { @@ -102,11 +102,28 @@ public class BookStore extends BookStoreClientCloseable { CompletableFuture .runAsync(() -> { - sink.send(createStatsEvent(builder.name("book"), 1)); - sink.send(createStatsEvent(builder.name("book"), 2)); - sink.send(createStatsEvent(builder.name("book"), 3)); - sink.send(createStatsEvent(builder.name("book"), 4)); - sink.send(createStatsEvent(builder.name("book"), 5)); + sink.send(createEvent(builder.name("book"), 1)); + sink.send(createEvent(builder.name("book"), 2)); + sink.send(createEvent(builder.name("book"), 3)); + sink.send(createEvent(builder.name("book"), 4)); + sink.send(createEvent(builder.name("book"), 5)); + }) + .whenComplete((r, ex) -> sink.close()); + } + + @GET + @Path("/titles/sse") + @Produces(MediaType.SERVER_SENT_EVENTS) + public void forBookTitlesOnly(@Context SseEventSink sink) { + final Builder builder = sse.newEventBuilder(); + + CompletableFuture + .runAsync(() -> { + sink.send(createRawEvent(builder.name("book"), 1)); + sink.send(createRawEvent(builder.name("book"), 2)); + sink.send(createRawEvent(builder.name("book"), 3)); + sink.send(createRawEvent(builder.name("book"), 4)); + sink.send(createRawEvent(builder.name("book"), 5)); }) .whenComplete((r, ex) -> sink.close()); } @@ -139,8 +156,8 @@ public class BookStore extends BookStoreClientCloseable { } final Builder builder = sse.newEventBuilder(); - broadcaster.broadcast(createStatsEvent(builder.name("book"), 1000)) - .thenAcceptBoth(broadcaster.broadcast(createStatsEvent(builder.name("book"), 2000)), (a, b) -> { }) + broadcaster.broadcast(createEvent(builder.name("book"), 1000)) + .thenAcceptBoth(broadcaster.broadcast(createEvent(builder.name("book"), 2000)), (a, b) -> { }) .whenComplete((r, ex) -> { if (broadcaster != null) { broadcaster.close(); diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java index d7abb04..a922d08 100644 --- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java +++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java @@ -77,13 +77,13 @@ public class BookStore2 extends BookStoreClientCloseable { final Integer id = Integer.valueOf(lastEventId); final Builder builder = sse.newEventBuilder(); - sink.send(createStatsEvent(builder.name("book"), id + 1)); + sink.send(createEvent(builder.name("book"), id + 1)); Thread.sleep(200); - sink.send(createStatsEvent(builder.name("book"), id + 2)); + sink.send(createEvent(builder.name("book"), id + 2)); Thread.sleep(200); - sink.send(createStatsEvent(builder.name("book"), id + 3)); + sink.send(createEvent(builder.name("book"), id + 3)); Thread.sleep(200); - sink.send(createStatsEvent(builder.name("book"), id + 4)); + sink.send(createEvent(builder.name("book"), id + 4)); Thread.sleep(200); sink.close(); } catch (final InterruptedException ex) { @@ -101,11 +101,28 @@ public class BookStore2 extends BookStoreClientCloseable { CompletableFuture .runAsync(() -> { - sink.send(createStatsEvent(builder.name("book"), 1)); - sink.send(createStatsEvent(builder.name("book"), 2)); - sink.send(createStatsEvent(builder.name("book"), 3)); - sink.send(createStatsEvent(builder.name("book"), 4)); - sink.send(createStatsEvent(builder.name("book"), 5)); + sink.send(createEvent(builder.name("book"), 1)); + sink.send(createEvent(builder.name("book"), 2)); + sink.send(createEvent(builder.name("book"), 3)); + sink.send(createEvent(builder.name("book"), 4)); + sink.send(createEvent(builder.name("book"), 5)); + }) + .whenComplete((r, ex) -> sink.close()); + } + + @GET + @Path("/titles/sse") + @Produces(MediaType.SERVER_SENT_EVENTS) + public void forBookTitlesOnly(@Context SseEventSink sink) { + final Builder builder = sse.newEventBuilder(); + + CompletableFuture + .runAsync(() -> { + sink.send(createRawEvent(builder.name("book"), 1)); + sink.send(createRawEvent(builder.name("book"), 2)); + sink.send(createRawEvent(builder.name("book"), 3)); + sink.send(createRawEvent(builder.name("book"), 4)); + sink.send(createRawEvent(builder.name("book"), 5)); }) .whenComplete((r, ex) -> sink.close()); } @@ -138,8 +155,8 @@ public class BookStore2 extends BookStoreClientCloseable { } final Builder builder = sse.newEventBuilder(); - broadcaster.broadcast(createStatsEvent(builder.name("book"), 1000)) - .thenAcceptBoth(broadcaster.broadcast(createStatsEvent(builder.name("book"), 2000)), (a, b) -> { }) + broadcaster.broadcast(createEvent(builder.name("book"), 1000)) + .thenAcceptBoth(broadcaster.broadcast(createEvent(builder.name("book"), 2000)), (a, b) -> { }) .whenComplete((r, ex) -> { if (broadcaster != null) { broadcaster.close(); diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreClientCloseable.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreClientCloseable.java index b9b599d..2fb50a6 100644 --- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreClientCloseable.java +++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreClientCloseable.java @@ -64,14 +64,14 @@ abstract class BookStoreClientCloseable { localBroadcaster.onClose(sseEventSink -> stats.closed()); localBroadcaster.register(sink); - localBroadcaster.broadcast(createStatsEvent(builder.name("book"), id + 1)) + localBroadcaster.broadcast(createEvent(builder.name("book"), id + 1)) .whenComplete((r, ex) -> stats.inc()); // Await client to confirm the it got the event (PUT /client-closes-connection/received) phaser.arriveAndAwaitAdvance(); Thread.sleep(500); - localBroadcaster.broadcast(createStatsEvent(builder.name("book"), id + 2)) + localBroadcaster.broadcast(createEvent(builder.name("book"), id + 2)) .whenComplete((r, ex) -> { // we expect exception here if (ex == null && !sink.isClosed()) { @@ -85,7 +85,7 @@ abstract class BookStoreClientCloseable { // This event should complete exceptionally since SseEventSource should be // closed already. Thread.sleep(500); - localBroadcaster.broadcast(createStatsEvent(builder.name("book"), id + 3)) + localBroadcaster.broadcast(createEvent(builder.name("book"), id + 3)) .whenComplete((r, ex) -> { // we expect exception here if (ex == null && !sink.isClosed()) { @@ -96,7 +96,7 @@ abstract class BookStoreClientCloseable { // This event should complete immediately since the sink has been removed // from the broadcaster (closed). Thread.sleep(500); - localBroadcaster.broadcast(createStatsEvent(builder.name("book"), id + 4)) + localBroadcaster.broadcast(createEvent(builder.name("book"), id + 4)) .whenComplete((r, ex) -> { // we expect the sink to be closed at this point if (ex != null || !sink.isClosed()) { @@ -136,11 +136,19 @@ abstract class BookStoreClientCloseable { return stats; } - protected static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) { + protected static OutboundSseEvent createEvent(final OutboundSseEvent.Builder builder, final int eventId) { return builder .id(Integer.toString(eventId)) .data(Book.class, new Book("New Book #" + eventId, eventId)) .mediaType(MediaType.APPLICATION_JSON_TYPE) .build(); } + + protected static OutboundSseEvent createRawEvent(final OutboundSseEvent.Builder builder, final int eventId) { + return builder + .id(Integer.toString(eventId)) + .data("New Book #" + eventId) + .mediaType(MediaType.TEXT_PLAIN_TYPE) + .build(); + } } diff --git a/systests/spring-boot/pom.xml b/systests/spring-boot/pom.xml new file mode 100644 index 0000000..4196238 --- /dev/null +++ b/systests/spring-boot/pom.xml @@ -0,0 +1,140 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <parent> + <artifactId>cxf-parent</artifactId> + <groupId>org.apache.cxf</groupId> + <version>3.2.13-SNAPSHOT</version> + <relativePath>../../parent/pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.cxf.systests</groupId> + <artifactId>cxf-systests-spring-boot</artifactId> + <name>Apache CXF Spring Boot Integration System Tests</name> + <description>Apache CXF Spring Boot Integration System Tests</description> + <url>https://cxf.apache.org</url> + + <properties> + <cxf.module.name>org.apache.cxf.systests.spring.boot</cxf.module.name> + </properties> + + <build> + <testSourceDirectory>${basedir}/src/test/java</testSourceDirectory> + <testResources> + <testResource> + <directory>src/test/java</directory> + <excludes> + <exclude>**/*.java</exclude> + </excludes> + </testResource> + <testResource> + <directory>src/test/resources</directory> + <includes> + <include>**/*</include> + </includes> + </testResource> + </testResources> + <plugins> + <plugin> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <id>attach-sources</id> + <goals> + <goal>test-jar</goal> + </goals> + <configuration> + <archive> + <manifestEntries> + <Automatic-Module-Name>${cxf.module.name}.tests</Automatic-Module-Name> + </manifestEntries> + </archive> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>javax.validation</groupId> + <artifactId>validation-api</artifactId> + <version>2.0.1.Final</version> + </dependency> + </dependencies> + </dependencyManagement> + + <dependencies> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>jcl-over-slf4j</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-spring-boot-starter-jaxrs</artifactId> + </dependency> + <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-rt-frontend-jaxrs</artifactId> + </dependency> + <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-rt-rs-client</artifactId> + </dependency> + <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-rt-rs-sse</artifactId> + </dependency> + <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-rt-rs-service-description-openapi-v3</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.jaxrs</groupId> + <artifactId>jackson-jaxrs-json-provider</artifactId> + </dependency> + <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-testutils</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-test</artifactId> + <version>${cxf.spring.boot.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.xmlunit</groupId> + <artifactId>xmlunit-core</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> +</project> diff --git a/systests/spring-boot/src/test/java/org/apache/cxf/systest/jaxrs/resources/Book.java b/systests/spring-boot/src/test/java/org/apache/cxf/systest/jaxrs/resources/Book.java new file mode 100644 index 0000000..0dc4165 --- /dev/null +++ b/systests/spring-boot/src/test/java/org/apache/cxf/systest/jaxrs/resources/Book.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.jaxrs.resources; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.commons.lang3.builder.ToStringBuilder; + +public class Book { + private String title; + private String author; + + public Book() { + } + + public Book(final String title, final String author) { + this.title = title; + this.author = author; + } + + public String getTitle() { + return title; + } + + public void setTitle(String title) { + this.title = title; + } + + public String getAuthor() { + return author; + } + + public void setAuthor(String author) { + this.author = author; + } + + @Override + public int hashCode() { + return HashCodeBuilder.reflectionHashCode(this); + } + + @Override + public boolean equals(Object obj) { + return EqualsBuilder.reflectionEquals(this, obj); + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this); + } +} diff --git a/systests/spring-boot/src/test/java/org/apache/cxf/systest/jaxrs/spring/boot/SpringSseEmitterTest.java b/systests/spring-boot/src/test/java/org/apache/cxf/systest/jaxrs/spring/boot/SpringSseEmitterTest.java new file mode 100644 index 0000000..e4c10cc --- /dev/null +++ b/systests/spring-boot/src/test/java/org/apache/cxf/systest/jaxrs/spring/boot/SpringSseEmitterTest.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.jaxrs.spring.boot; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Consumer; + +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.sse.InboundSseEvent; +import javax.ws.rs.sse.SseEventSource; + +import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; + +import org.apache.cxf.systest.jaxrs.resources.Book; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.context.embedded.LocalServerPort; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.http.MediaType; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter.SseEventBuilder; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.MatcherAssert.assertThat; + +@RunWith(SpringRunner.class) +@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, classes = SpringSseEmitterTest.LibraryController.class) +public class SpringSseEmitterTest { + @LocalServerPort + private int port; + + @RestController + @EnableAutoConfiguration + static class LibraryController { + @GetMapping("/sse") + public SseEmitter streamSseMvc() { + final SseEmitter emitter = new SseEmitter(); + final ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor(); + + sseMvcExecutor.execute(() -> { + try { + for (int eventId = 1; eventId <= 5; ++eventId) { + SseEventBuilder event = SseEmitter.event() + .id(Integer.toString(eventId)) + .data(new Book("New Book #" + eventId, "Author #" + eventId), MediaType.APPLICATION_JSON) + .name("book"); + emitter.send(event); + Thread.sleep(100); + } + } catch (Exception ex) { + emitter.completeWithError(ex); + } + }); + + return emitter; + } + } + + @Test + public void testSseEvents() throws InterruptedException { + final WebTarget target = createWebTarget(); + final Collection<Book> books = new ArrayList<>(); + + try (SseEventSource eventSource = SseEventSource.target(target).build()) { + eventSource.register(collect(books), System.out::println); + eventSource.open(); + // Give the SSE stream some time to collect all events + awaitEvents(5000, books, 5); + } + + assertThat(books, + hasItems( + new Book("New Book #1", "Author #1"), + new Book("New Book #2", "Author #2"), + new Book("New Book #3", "Author #3"), + new Book("New Book #4", "Author #4"), + new Book("New Book #5", "Author #5") + ) + ); + } + + private WebTarget createWebTarget() { + return ClientBuilder + .newClient() + .property("http.receive.timeout", 8000) + .register(JacksonJsonProvider.class) + .target("http://localhost:" + port + "/sse"); + } + + private static Consumer<InboundSseEvent> collect(final Collection< Book > books) { + return event -> books.add(event.readData(Book.class, javax.ws.rs.core.MediaType.APPLICATION_JSON_TYPE)); + } + + private void awaitEvents(long timeout, final Collection<?> events, int size) throws InterruptedException { + final long sleep = timeout / 10; + + for (int i = 0; i < timeout; i += sleep) { + if (events.size() == size) { + break; + } else { + Thread.sleep(sleep); + } + } + } +}
