This is an automated email from the ASF dual-hosted git repository.
reta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/master by this push:
new 8c1046a CXF-8561: SseEventSource sometimes sends partial events when
being closed (#825)
8c1046a is described below
commit 8c1046a5ce16248b4ca0d4d61f16d7b066b259d2
Author: Andriy Redko <[email protected]>
AuthorDate: Fri Jul 9 09:48:40 2021 -0400
CXF-8561: SseEventSource sometimes sends partial events when being closed
(#825)
---
.../jaxrs/sse/client/InboundSseEventProcessor.java | 18 +++++++-
.../cxf/jaxrs/sse/client/SseEventSourceImpl.java | 24 +++++++++-
.../jaxrs/sse/client/SseEventSourceImplTest.java | 51 +++++++++++++++++++---
3 files changed, 85 insertions(+), 8 deletions(-)
diff --git
a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
index 016282c..c1e29ee 100644
---
a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
+++
b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
@@ -56,12 +56,18 @@ public class InboundSseEventProcessor {
private final Endpoint endpoint;
private final InboundSseEventListener listener;
private final ExecutorService executor;
+ private final boolean discardIncomplete;
private volatile boolean closed;
protected InboundSseEventProcessor(Endpoint endpoint,
InboundSseEventListener listener) {
+ this(endpoint, listener, true);
+ }
+
+ protected InboundSseEventProcessor(Endpoint endpoint,
InboundSseEventListener listener, boolean discardIncomplete) {
this.endpoint = endpoint;
this.listener = listener;
+ this.discardIncomplete = discardIncomplete;
this.executor = Executors.newSingleThreadScheduledExecutor();
}
@@ -116,7 +122,17 @@ public class InboundSseEventProcessor {
}
if (builder != null) {
- listener.onNext(builder.build(factory, message));
+ // As per
https://www.w3.org/TR/2021/SPSD-eventsource-20210128/#event-stream-interpretation:
+ //
+ // ... Once the end of the file is reached, any pending
data must be discarded.
+ // (If the file ends in the middle of an event, before
the final empty line,
+ // the incomplete event is not dispatched.) ...
+ //
+ if (discardIncomplete /* default */) {
+ LOG.fine("Discarding incomplete SSE event");
+ } else {
+ listener.onNext(builder.build(factory, message));
+ }
}
// complete the stream
diff --git
a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
index 7bda05c..7a15099 100644
---
a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
+++
b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
@@ -49,6 +49,9 @@ import org.apache.cxf.jaxrs.utils.ExceptionUtils;
* SSE Event Source implementation
*/
public class SseEventSourceImpl implements SseEventSource {
+ // Whether or not incomplete SSE events should be discarded (by default,
they will be discarded)
+ public static final String DISCARD_INCOMPLETE_EVENTS =
"org.apache.cxf.sse.discard.incomplete.events";
+
private static final Logger LOG =
LogUtils.getL7dLogger(SseEventSourceImpl.class);
private final WebTarget target;
@@ -246,8 +249,10 @@ public class SseEventSourceImpl implements SseEventSource {
final Endpoint endpoint =
WebClient.getConfig(target).getEndpoint();
// Create new processor if this is the first time or the old one
has been closed
if (processor == null || processor.isClosed()) {
- LOG.fine("Creating new instance of SSE event processor ...");
- processor = new InboundSseEventProcessor(endpoint, delegate);
+ final boolean discardIncomplete =
getConfigurationProperty(DISCARD_INCOMPLETE_EVENTS, true);
+ LOG.fine("Creating new instance of SSE event processor
(discard incomplete events is set to '"
+ + discardIncomplete + "') ...");
+ processor = new InboundSseEventProcessor(endpoint, delegate,
discardIncomplete);
}
// Start consuming events
@@ -364,4 +369,19 @@ public class SseEventSourceImpl implements SseEventSource {
open = false;
}
}
+
+ private boolean getConfigurationProperty(String name, boolean
defaultValue) {
+ final Configuration configuration = target.getConfiguration();
+
+ if (configuration != null) {
+ final Object value = configuration.getProperty(name);
+ if (value instanceof Boolean) {
+ return (Boolean)value;
+ } else if (value != null) {
+ return Boolean.valueOf(value.toString());
+ }
+ }
+
+ return defaultValue;
+ }
}
diff --git
a/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java
b/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java
index 8a1b3e1..1b21676 100644
---
a/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java
+++
b/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java
@@ -21,6 +21,7 @@ package org.apache.cxf.jaxrs.sse.client;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
@@ -34,6 +35,7 @@ import java.util.function.Function;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.GET;
import javax.ws.rs.Produces;
+import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Context;
@@ -272,10 +274,13 @@ public class SseEventSourceImplTest {
assertThat(events.get(0).getName(), nullValue());
assertThat(events.get(0).readData(), equalTo("just test data\nin
multiple lines"));
}
-
+
@Test
public void testNoReconnectAndJustEventNameIsReceived() throws
InterruptedException, IOException {
- try (SseEventSource eventSource =
withNoReconnect(Type.EVENT_JUST_NAME)) {
+ final Map<String, Object> properties = Collections
+ .singletonMap(SseEventSourceImpl.DISCARD_INCOMPLETE_EVENTS, false);
+
+ try (SseEventSource eventSource =
withNoReconnect(Type.EVENT_JUST_NAME, properties)) {
eventSource.open();
assertThat(eventSource.isOpen(), equalTo(true));
@@ -292,6 +297,25 @@ public class SseEventSourceImplTest {
}
@Test
+ public void testNoReconnectAndIncompleteEventIsDiscarded() throws
InterruptedException, IOException {
+ try (SseEventSource eventSource =
withNoReconnect(Type.EVENT_JUST_NAME)) {
+ eventSource.open();
+
+ assertThat(eventSource.isOpen(), equalTo(true));
+
+ // Allow the event processor to pull for events (150ms)
+ Thread.sleep(150L);
+ }
+
+ // incomplete event should be discarded
+ await()
+ .during(Duration.ofMillis(500L))
+ .until(events::isEmpty);
+
+ assertThat(events.size(), equalTo(0));
+ }
+
+ @Test
public void testNoReconnectAndMixedEventsAreReceived() throws
InterruptedException, IOException {
try (SseEventSource eventSource = withNoReconnect(Type.EVENT_MIXED)) {
eventSource.open();
@@ -478,11 +502,19 @@ public class SseEventSourceImplTest {
}
private SseEventSource withNoReconnect(Type type) {
- return withNoReconnect(type, null);
+ return withNoReconnect(type, null, Collections.emptyMap());
+ }
+
+ private SseEventSource withNoReconnect(Type type, Map<String, Object>
properties) {
+ return withNoReconnect(type, null, properties);
}
private SseEventSource withNoReconnect(Type type, String lastEventId) {
- SseEventSource eventSource = SseEventSource.target(target(type,
lastEventId)).build();
+ return withNoReconnect(type, lastEventId, Collections.emptyMap());
+ }
+
+ private SseEventSource withNoReconnect(Type type, String lastEventId,
Map<String, Object> properties) {
+ SseEventSource eventSource = SseEventSource.target(target(type,
lastEventId, properties)).build();
eventSource.register(events::add, errors::add);
return eventSource;
}
@@ -500,7 +532,16 @@ public class SseEventSourceImplTest {
}
private static WebTarget target(Type type, String lastEventId) {
- final WebTarget target =
ClientBuilder.newClient().target(LOCAL_ADDRESS + type.name());
+ return target(type, lastEventId, Collections.emptyMap());
+ }
+
+ private static WebTarget target(Type type, String lastEventId, Map<String,
Object> properties) {
+ final Client client = ClientBuilder.newClient();
+ if (properties != null) {
+ properties.forEach(client::property);
+ }
+
+ final WebTarget target = client.target(LOCAL_ADDRESS + type.name());
if (lastEventId != null) {
target.property(HttpHeaders.LAST_EVENT_ID_HEADER, lastEventId);
}