This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 3.2.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/3.2.x-fixes by this push:
     new e09cc41  CXF-7804: SSE SseBroadcaster implementation not performing 
onClose / onError callbacks
e09cc41 is described below

commit e09cc41d4c9e9a8a3b5662bc965c11e697910157
Author: reta <drr...@gmail.com>
AuthorDate: Mon Jul 23 18:27:12 2018 -0400

    CXF-7804: SSE SseBroadcaster implementation not performing onClose / 
onError callbacks
---
 rt/rs/sse/pom.xml                                  |  10 ++
 .../apache/cxf/jaxrs/sse/SseBroadcasterImpl.java   |   7 +-
 .../cxf/jaxrs/sse/SseBroadcasterImplTest.java      | 122 +++++++++++++++++++++
 3 files changed, 137 insertions(+), 2 deletions(-)

diff --git a/rt/rs/sse/pom.xml b/rt/rs/sse/pom.xml
index 19252b7..ae5fd64 100644
--- a/rt/rs/sse/pom.xml
+++ b/rt/rs/sse/pom.xml
@@ -54,6 +54,16 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-web</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>${cxf.servlet-api.group}</groupId>
             <artifactId>${cxf.servlet-api.artifact}</artifactId>
             <scope>provided</scope>
diff --git 
a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java 
b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
index d98962a..fc3b0ef 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
@@ -36,7 +36,7 @@ import javax.ws.rs.sse.OutboundSseEvent;
 import javax.ws.rs.sse.SseBroadcaster;
 import javax.ws.rs.sse.SseEventSink;
 
-public class SseBroadcasterImpl implements SseBroadcaster {
+public final class SseBroadcasterImpl implements SseBroadcaster {
     private final Set<SseEventSink> subscribers = new CopyOnWriteArraySet<>();
     private final Set<Consumer<SseEventSink>> closers = new 
CopyOnWriteArraySet<>();
     private final Set<BiConsumer<SseEventSink, Throwable>> exceptioners = new 
CopyOnWriteArraySet<>();
@@ -53,6 +53,8 @@ public class SseBroadcasterImpl implements SseBroadcaster {
             @Override
             public void onComplete(AsyncEvent asyncEvent) throws IOException {
                 subscribers.remove(sink);
+                // The SseEventSinkImpl completes the asynchronous operation 
on close() method call.
+                closers.forEach(closer -> closer.accept(sink));
             }
 
             @Override
@@ -63,6 +65,8 @@ public class SseBroadcasterImpl implements SseBroadcaster {
             @Override
             public void onError(AsyncEvent asyncEvent) throws IOException {
                 subscribers.remove(sink);
+                // Propagate the error from SseEventSinkImpl asynchronous 
context
+                exceptioners.forEach(exceptioner -> exceptioner.accept(sink, 
asyncEvent.getThrowable()));
             }
 
             @Override
@@ -107,7 +111,6 @@ public class SseBroadcasterImpl implements SseBroadcaster {
         if (closed.compareAndSet(false, true)) {
             subscribers.forEach(subscriber -> {
                 subscriber.close();
-                closers.forEach(closer -> closer.accept(subscriber));
             });
         }
     }
diff --git 
a/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImplTest.java 
b/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImplTest.java
new file mode 100644
index 0000000..9cfa782
--- /dev/null
+++ 
b/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImplTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jaxrs.sse;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.atomic.LongAdder;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.sse.OutboundSseEvent;
+import javax.ws.rs.sse.SseBroadcaster;
+
+import org.springframework.mock.web.MockAsyncContext;
+import org.springframework.mock.web.MockHttpServletRequest;
+import org.springframework.mock.web.MockHttpServletResponse;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class SseBroadcasterImplTest extends Assert {
+    private SseBroadcaster broadcaster;
+    private MessageBodyWriter<OutboundSseEvent> writer;
+    private MockHttpServletResponse response;
+    private MockAsyncContext ctx;
+    
+    @SuppressWarnings("unchecked") 
+    @Before
+    public void setUp() {
+        broadcaster = new SseBroadcasterImpl();
+        response = new MockHttpServletResponse();
+        writer = mock(MessageBodyWriter.class);
+        ctx = new MockAsyncContext(new MockHttpServletRequest(), response);
+    }
+
+    @Test
+    public void testOnCloseCallbackIsCalled() {
+        final LongAdder adder = new LongAdder();
+        final SseEventSinkImpl sink = new SseEventSinkImpl(writer, null, ctx);
+        broadcaster.register(sink);
+        
+        broadcaster.onClose(s -> {
+            if (s == sink) {
+                adder.increment();
+            }
+        });
+        assertThat(adder.intValue(), equalTo(0));
+        
+        sink.close();
+        assertThat(adder.intValue(), equalTo(1));
+    }
+    
+    @Test
+    public void testOnCloseCallbackIsCalledForBroadcaster() {
+        final LongAdder adder = new LongAdder();
+        final SseEventSinkImpl sink = new SseEventSinkImpl(writer, null, ctx);
+        broadcaster.register(sink);
+        
+        broadcaster.onClose(s -> {
+            if (s == sink) {
+                adder.increment();
+            }
+        });
+        assertThat(adder.intValue(), equalTo(0));
+        
+        broadcaster.close();
+        assertThat(adder.intValue(), equalTo(1));
+    }
+
+    @Test
+    public void testOnErrorCallbackIsCalled() throws WebApplicationException, 
IOException {
+        when(writer.isWriteable(any(), any(), any(), any())).thenReturn(true);
+        
+        final LongAdder adder = new LongAdder();
+        final SseEventSinkImpl sink = new SseEventSinkImpl(writer, null, ctx) {
+            @Override
+            public CompletionStage<?> send(OutboundSseEvent event) {
+                ctx.start(() -> { 
+                    throw new RuntimeException("Failed to schedule async 
task");
+                });
+                return CompletableFuture.completedFuture(null);
+            }
+        };
+        broadcaster.register(sink);
+        
+        broadcaster.onError((s, ex) -> {
+            if (s == sink) {
+                adder.increment();
+            }
+        });
+        assertThat(adder.intValue(), equalTo(0));
+        
+        broadcaster.broadcast(new OutboundSseEventImpl.BuilderImpl().build());
+        broadcaster.close();
+        
+        assertThat(adder.intValue(), equalTo(1));
+    }
+}

Reply via email to