This is an automated email from the ASF dual-hosted git repository. reta pushed a commit to branch 3.2.x-fixes in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/3.2.x-fixes by this push: new e09cc41 CXF-7804: SSE SseBroadcaster implementation not performing onClose / onError callbacks e09cc41 is described below commit e09cc41d4c9e9a8a3b5662bc965c11e697910157 Author: reta <drr...@gmail.com> AuthorDate: Mon Jul 23 18:27:12 2018 -0400 CXF-7804: SSE SseBroadcaster implementation not performing onClose / onError callbacks --- rt/rs/sse/pom.xml | 10 ++ .../apache/cxf/jaxrs/sse/SseBroadcasterImpl.java | 7 +- .../cxf/jaxrs/sse/SseBroadcasterImplTest.java | 122 +++++++++++++++++++++ 3 files changed, 137 insertions(+), 2 deletions(-) diff --git a/rt/rs/sse/pom.xml b/rt/rs/sse/pom.xml index 19252b7..ae5fd64 100644 --- a/rt/rs/sse/pom.xml +++ b/rt/rs/sse/pom.xml @@ -54,6 +54,16 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-web</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>${cxf.servlet-api.group}</groupId> <artifactId>${cxf.servlet-api.artifact}</artifactId> <scope>provided</scope> diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java index d98962a..fc3b0ef 100644 --- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java @@ -36,7 +36,7 @@ import javax.ws.rs.sse.OutboundSseEvent; import javax.ws.rs.sse.SseBroadcaster; import javax.ws.rs.sse.SseEventSink; -public class SseBroadcasterImpl implements SseBroadcaster { +public final class SseBroadcasterImpl implements SseBroadcaster { private final Set<SseEventSink> subscribers = new CopyOnWriteArraySet<>(); private final Set<Consumer<SseEventSink>> closers = new CopyOnWriteArraySet<>(); private final Set<BiConsumer<SseEventSink, Throwable>> exceptioners = new CopyOnWriteArraySet<>(); @@ -53,6 +53,8 @@ public class SseBroadcasterImpl implements SseBroadcaster { @Override public void onComplete(AsyncEvent asyncEvent) throws IOException { subscribers.remove(sink); + // The SseEventSinkImpl completes the asynchronous operation on close() method call. + closers.forEach(closer -> closer.accept(sink)); } @Override @@ -63,6 +65,8 @@ public class SseBroadcasterImpl implements SseBroadcaster { @Override public void onError(AsyncEvent asyncEvent) throws IOException { subscribers.remove(sink); + // Propagate the error from SseEventSinkImpl asynchronous context + exceptioners.forEach(exceptioner -> exceptioner.accept(sink, asyncEvent.getThrowable())); } @Override @@ -107,7 +111,6 @@ public class SseBroadcasterImpl implements SseBroadcaster { if (closed.compareAndSet(false, true)) { subscribers.forEach(subscriber -> { subscriber.close(); - closers.forEach(closer -> closer.accept(subscriber)); }); } } diff --git a/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImplTest.java b/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImplTest.java new file mode 100644 index 0000000..9cfa782 --- /dev/null +++ b/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImplTest.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.jaxrs.sse; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.LongAdder; + +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.ext.MessageBodyWriter; +import javax.ws.rs.sse.OutboundSseEvent; +import javax.ws.rs.sse.SseBroadcaster; + +import org.springframework.mock.web.MockAsyncContext; +import org.springframework.mock.web.MockHttpServletRequest; +import org.springframework.mock.web.MockHttpServletResponse; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SseBroadcasterImplTest extends Assert { + private SseBroadcaster broadcaster; + private MessageBodyWriter<OutboundSseEvent> writer; + private MockHttpServletResponse response; + private MockAsyncContext ctx; + + @SuppressWarnings("unchecked") + @Before + public void setUp() { + broadcaster = new SseBroadcasterImpl(); + response = new MockHttpServletResponse(); + writer = mock(MessageBodyWriter.class); + ctx = new MockAsyncContext(new MockHttpServletRequest(), response); + } + + @Test + public void testOnCloseCallbackIsCalled() { + final LongAdder adder = new LongAdder(); + final SseEventSinkImpl sink = new SseEventSinkImpl(writer, null, ctx); + broadcaster.register(sink); + + broadcaster.onClose(s -> { + if (s == sink) { + adder.increment(); + } + }); + assertThat(adder.intValue(), equalTo(0)); + + sink.close(); + assertThat(adder.intValue(), equalTo(1)); + } + + @Test + public void testOnCloseCallbackIsCalledForBroadcaster() { + final LongAdder adder = new LongAdder(); + final SseEventSinkImpl sink = new SseEventSinkImpl(writer, null, ctx); + broadcaster.register(sink); + + broadcaster.onClose(s -> { + if (s == sink) { + adder.increment(); + } + }); + assertThat(adder.intValue(), equalTo(0)); + + broadcaster.close(); + assertThat(adder.intValue(), equalTo(1)); + } + + @Test + public void testOnErrorCallbackIsCalled() throws WebApplicationException, IOException { + when(writer.isWriteable(any(), any(), any(), any())).thenReturn(true); + + final LongAdder adder = new LongAdder(); + final SseEventSinkImpl sink = new SseEventSinkImpl(writer, null, ctx) { + @Override + public CompletionStage<?> send(OutboundSseEvent event) { + ctx.start(() -> { + throw new RuntimeException("Failed to schedule async task"); + }); + return CompletableFuture.completedFuture(null); + } + }; + broadcaster.register(sink); + + broadcaster.onError((s, ex) -> { + if (s == sink) { + adder.increment(); + } + }); + assertThat(adder.intValue(), equalTo(0)); + + broadcaster.broadcast(new OutboundSseEventImpl.BuilderImpl().build()); + broadcaster.close(); + + assertThat(adder.intValue(), equalTo(1)); + } +}