This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 3.3.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/3.3.x-fixes by this push:
     new 9dffd02  CXF-8561: SseEventSource sometimes sends partial events when 
being closed (#825)
9dffd02 is described below

commit 9dffd02230b66ef9af0914fc8bcb92a5a8876909
Author: Andriy Redko <[email protected]>
AuthorDate: Fri Jul 9 09:48:40 2021 -0400

    CXF-8561: SseEventSource sometimes sends partial events when being closed 
(#825)
    
    (cherry picked from commit 8c1046a5ce16248b4ca0d4d61f16d7b066b259d2)
    (cherry picked from commit 55f94cac6b9f5900802ba9f6cc4973f3dc6062e0)
---
 .../jaxrs/sse/client/InboundSseEventProcessor.java | 18 +++++++-
 .../cxf/jaxrs/sse/client/SseEventSourceImpl.java   | 24 +++++++++-
 .../jaxrs/sse/client/SseEventSourceImplTest.java   | 51 +++++++++++++++++++---
 3 files changed, 85 insertions(+), 8 deletions(-)

diff --git 
a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
 
b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
index 016282c..c1e29ee 100644
--- 
a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
+++ 
b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
@@ -56,12 +56,18 @@ public class InboundSseEventProcessor {
     private final Endpoint endpoint;
     private final InboundSseEventListener listener;
     private final ExecutorService executor;
+    private final boolean discardIncomplete;
     
     private volatile boolean closed;
     
     protected InboundSseEventProcessor(Endpoint endpoint, 
InboundSseEventListener listener) {
+        this(endpoint, listener, true);
+    }
+    
+    protected InboundSseEventProcessor(Endpoint endpoint, 
InboundSseEventListener listener, boolean discardIncomplete) {
         this.endpoint = endpoint;
         this.listener = listener;
+        this.discardIncomplete = discardIncomplete;
         this.executor = Executors.newSingleThreadScheduledExecutor();
     }
     
@@ -116,7 +122,17 @@ public class InboundSseEventProcessor {
                 }
                 
                 if (builder != null) {
-                    listener.onNext(builder.build(factory, message));
+                    // As per 
https://www.w3.org/TR/2021/SPSD-eventsource-20210128/#event-stream-interpretation:
+                    //
+                    //   ... Once the end of the file is reached, any pending 
data must be discarded. 
+                    //   (If the file ends in the middle of an event, before 
the final empty line, 
+                    //   the incomplete event is not dispatched.) ...
+                    //
+                    if (discardIncomplete /* default */) {
+                        LOG.fine("Discarding incomplete SSE event");
+                    } else {
+                        listener.onNext(builder.build(factory, message));
+                    }
                 }
 
                 // complete the stream
diff --git 
a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
 
b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
index 7bda05c..7a15099 100644
--- 
a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
+++ 
b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
@@ -49,6 +49,9 @@ import org.apache.cxf.jaxrs.utils.ExceptionUtils;
  * SSE Event Source implementation 
  */
 public class SseEventSourceImpl implements SseEventSource {
+    // Whether or not incomplete SSE events should be discarded (by default, 
they will be discarded)
+    public static final String DISCARD_INCOMPLETE_EVENTS = 
"org.apache.cxf.sse.discard.incomplete.events";
+
     private static final Logger LOG = 
LogUtils.getL7dLogger(SseEventSourceImpl.class);
     
     private final WebTarget target;
@@ -246,8 +249,10 @@ public class SseEventSourceImpl implements SseEventSource {
             final Endpoint endpoint = 
WebClient.getConfig(target).getEndpoint();
             // Create new processor if this is the first time or the old one 
has been closed 
             if (processor == null || processor.isClosed()) {
-                LOG.fine("Creating new instance of SSE event processor ...");
-                processor = new InboundSseEventProcessor(endpoint, delegate);
+                final boolean discardIncomplete = 
getConfigurationProperty(DISCARD_INCOMPLETE_EVENTS, true);
+                LOG.fine("Creating new instance of SSE event processor 
(discard incomplete events is set to '" 
+                    + discardIncomplete + "') ...");
+                processor = new InboundSseEventProcessor(endpoint, delegate, 
discardIncomplete);
             }
             
             // Start consuming events
@@ -364,4 +369,19 @@ public class SseEventSourceImpl implements SseEventSource {
             open = false;
         }
     }
+    
+    private boolean getConfigurationProperty(String name, boolean 
defaultValue) {
+        final Configuration configuration = target.getConfiguration();
+        
+        if (configuration != null) {
+            final Object value = configuration.getProperty(name);
+            if (value instanceof Boolean) {
+                return (Boolean)value;
+            } else if (value != null) {
+                return Boolean.valueOf(value.toString());
+            }
+        }
+        
+        return defaultValue;
+    }
 }
diff --git 
a/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java
 
b/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java
index 0bef971..b5f9e60 100644
--- 
a/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java
+++ 
b/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java
@@ -21,6 +21,7 @@ package org.apache.cxf.jaxrs.sse.client;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
@@ -34,6 +35,7 @@ import java.util.function.Function;
 import javax.ws.rs.BadRequestException;
 import javax.ws.rs.GET;
 import javax.ws.rs.Produces;
+import javax.ws.rs.client.Client;
 import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.Context;
@@ -272,10 +274,13 @@ public class SseEventSourceImplTest {
         assertThat(events.get(0).getName(), nullValue());
         assertThat(events.get(0).readData(), equalTo("just test data\nin 
multiple lines"));
     }
-
+    
     @Test
     public void testNoReconnectAndJustEventNameIsReceived() throws 
InterruptedException, IOException {
-        try (SseEventSource eventSource = 
withNoReconnect(Type.EVENT_JUST_NAME)) {
+        final Map<String, Object> properties = Collections
+            .singletonMap(SseEventSourceImpl.DISCARD_INCOMPLETE_EVENTS, false);
+        
+        try (SseEventSource eventSource = 
withNoReconnect(Type.EVENT_JUST_NAME, properties)) {
             eventSource.open();
 
             assertThat(eventSource.isOpen(), equalTo(true));
@@ -292,6 +297,25 @@ public class SseEventSourceImplTest {
     }
 
     @Test
+    public void testNoReconnectAndIncompleteEventIsDiscarded() throws 
InterruptedException, IOException {
+        try (SseEventSource eventSource = 
withNoReconnect(Type.EVENT_JUST_NAME)) {
+            eventSource.open();
+
+            assertThat(eventSource.isOpen(), equalTo(true));
+
+            // Allow the event processor to pull for events (150ms)
+            Thread.sleep(150L);
+        }
+
+        // incomplete event should be discarded
+        await()
+            .during(Duration.ofMillis(500L))
+            .until(events::isEmpty);
+
+        assertThat(events.size(), equalTo(0));
+    }
+
+    @Test
     public void testNoReconnectAndMixedEventsAreReceived() throws 
InterruptedException, IOException {
         try (SseEventSource eventSource = withNoReconnect(Type.EVENT_MIXED)) {
             eventSource.open();
@@ -478,11 +502,19 @@ public class SseEventSourceImplTest {
     }
     
     private SseEventSource withNoReconnect(Type type) {
-        return withNoReconnect(type, null);
+        return withNoReconnect(type, null, Collections.emptyMap());
+    }
+    
+    private SseEventSource withNoReconnect(Type type, Map<String, Object> 
properties) {
+        return withNoReconnect(type, null, properties);
     }
     
     private SseEventSource withNoReconnect(Type type, String lastEventId) {
-        SseEventSource eventSource = SseEventSource.target(target(type, 
lastEventId)).build();
+        return withNoReconnect(type, lastEventId, Collections.emptyMap());
+    }
+    
+    private SseEventSource withNoReconnect(Type type, String lastEventId, 
Map<String, Object> properties) {
+        SseEventSource eventSource = SseEventSource.target(target(type, 
lastEventId, properties)).build();
         eventSource.register(events::add, errors::add);
         return eventSource;
     }
@@ -500,7 +532,16 @@ public class SseEventSourceImplTest {
     }
 
     private static WebTarget target(Type type, String lastEventId) {
-        final WebTarget target = 
ClientBuilder.newClient().target(LOCAL_ADDRESS + type.name());
+        return target(type, lastEventId, Collections.emptyMap());
+    }
+    
+    private static WebTarget target(Type type, String lastEventId, Map<String, 
Object> properties) {
+        final Client client = ClientBuilder.newClient();
+        if (properties != null) {
+            properties.forEach(client::property);
+        }
+        
+        final WebTarget target = client.target(LOCAL_ADDRESS + type.name());
         if (lastEventId != null) {
             target.property(HttpHeaders.LAST_EVENT_ID_HEADER, lastEventId);
         }

Reply via email to