SSE servlet implementation

Using AsycContext instead of Atmosphere.
Borrows code from CXF's atmosphere implementation.


Project: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/commit/67a4cf16
Tree: 
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/tree/67a4cf16
Diff: 
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/diff/67a4cf16

Branch: refs/heads/master
Commit: 67a4cf1678b163b34c5568381d3cf4ed12c78438
Parents: 7632079
Author: Carlos Sierra <csie...@apache.org>
Authored: Fri Mar 9 16:14:58 2018 +0100
Committer: Carlos Sierra <csie...@apache.org>
Committed: Tue Mar 13 18:16:32 2018 +0100

----------------------------------------------------------------------
 jax-rs.itests/src/main/java/test/JaxrsTest.java |  54 ++++++
 .../src/main/java/test/types/TestHelper.java    |  19 ++-
 .../java/test/types/TestSSEApplication.java     |  74 +++++++++
 jax-rs.whiteboard/bnd.bnd                       |   2 +
 jax-rs.whiteboard/pom.xml                       |   6 +
 .../activator/CxfJaxrsBundleActivator.java      |  48 +++++-
 .../jax/rs/whiteboard/internal/Whiteboard.java  |   1 -
 .../cxf/CxfJaxrsServiceRegistrator.java         |   5 +
 .../cxf/sse/OutboundSseEventBodyWriter.java     | 123 ++++++++++++++
 .../internal/cxf/sse/OutboundSseEventImpl.java  | 164 +++++++++++++++++++
 .../internal/cxf/sse/SseBroadcasterImpl.java    | 105 ++++++++++++
 .../internal/cxf/sse/SseContextProvider.java    |  13 ++
 .../cxf/sse/SseEventSinkContextProvider.java    |  81 +++++++++
 .../internal/cxf/sse/SseEventSinkImpl.java      | 129 +++++++++++++++
 .../rs/whiteboard/internal/cxf/sse/SseImpl.java |  20 +++
 15 files changed, 841 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/67a4cf16/jax-rs.itests/src/main/java/test/JaxrsTest.java
----------------------------------------------------------------------
diff --git a/jax-rs.itests/src/main/java/test/JaxrsTest.java 
b/jax-rs.itests/src/main/java/test/JaxrsTest.java
index c367456..6c3301a 100644
--- a/jax-rs.itests/src/main/java/test/JaxrsTest.java
+++ b/jax-rs.itests/src/main/java/test/JaxrsTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
 import static org.osgi.service.jaxrs.whiteboard.JaxrsWhiteboardConstants.*;
 
 import java.lang.reflect.InvocationTargetException;
+import java.net.MalformedURLException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -49,6 +50,7 @@ import org.osgi.framework.ServiceFactory;
 import org.osgi.framework.ServiceRegistration;
 
 import org.osgi.service.jaxrs.client.PromiseRxInvoker;
+import org.osgi.service.jaxrs.client.SseEventSourceFactory;
 import org.osgi.service.jaxrs.runtime.JaxrsServiceRuntime;
 import org.osgi.service.jaxrs.runtime.dto.ApplicationDTO;
 import org.osgi.service.jaxrs.runtime.dto.DTOConstants;
@@ -70,9 +72,11 @@ import test.types.TestAsyncResource;
 import test.types.TestFilter;
 import test.types.TestFilterAndExceptionMapper;
 import test.types.TestHelper;
+import test.types.TestSSEApplication;
 
 import javax.ws.rs.GET;
 import javax.ws.rs.HttpMethod;
+import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.container.ContainerResponseFilter;
@@ -81,6 +85,7 @@ import javax.ws.rs.core.Feature;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.sse.SseEventSource;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
@@ -1703,6 +1708,54 @@ public class JaxrsTest extends TestHelper {
 
         assertEquals(0, runtimeDTO.failedExtensionDTOs.length);
     }
+
+    @Test
+    public void testSSEApplication() throws 
+        InterruptedException, MalformedURLException {
+        
+        registerApplication(
+            new TestSSEApplication(), JAX_RS_APPLICATION_BASE, "/sse");
+
+        SseEventSourceFactory sseFactory = createSseFactory();
+
+        SseEventSource source1 = sseFactory.newSource(
+            createDefaultTarget().path("/sse").path("/subscribe"));
+
+        SseEventSource source2 = sseFactory.newSource(
+                createDefaultTarget().path("/sse").path("/subscribe"));
+
+        ArrayList<String> source1Events = new ArrayList<>();
+        ArrayList<String> source2Events = new ArrayList<>();
+
+        source1.register(event -> 
source1Events.add(event.readData(String.class)));
+        source2.register(event -> 
source2Events.add(event.readData(String.class)));
+
+        source1.open();
+        source2.open();
+
+        WebTarget broadcast = createDefaultTarget().path("/sse").path(
+            "/broadcast");
+
+        broadcast.request().post(
+            Entity.entity("message", MediaType.TEXT_PLAIN_TYPE));
+
+        source2.close();
+
+        assertEquals(Arrays.asList("welcome", "message"), source1Events);
+        assertEquals(Arrays.asList("welcome", "message"), source2Events);
+
+        broadcast.request().post(
+            Entity.entity("another message", MediaType.TEXT_PLAIN_TYPE));
+
+        assertEquals(
+            Arrays.asList("welcome", "message", "another message"),
+            source1Events);
+        assertEquals(Arrays.asList("welcome", "message"), source2Events);
+
+        source1.close();
+    }
+
+
     private static Function<RuntimeDTO, FailedApplicationDTO[]>
         FAILED_APPLICATIONS = r -> r.failedApplicationDTOs;
     private Collection<ServiceRegistration<?>> _registrations =
@@ -1982,4 +2035,5 @@ public class JaxrsTest extends TestHelper {
 
         return serviceRegistration;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/67a4cf16/jax-rs.itests/src/main/java/test/types/TestHelper.java
----------------------------------------------------------------------
diff --git a/jax-rs.itests/src/main/java/test/types/TestHelper.java 
b/jax-rs.itests/src/main/java/test/types/TestHelper.java
index 35ad671..a5c5869 100644
--- a/jax-rs.itests/src/main/java/test/types/TestHelper.java
+++ b/jax-rs.itests/src/main/java/test/types/TestHelper.java
@@ -26,7 +26,7 @@ import org.junit.Before;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.FrameworkUtil;
 import org.osgi.framework.ServiceReference;
-import org.osgi.service.jaxrs.runtime.JaxrsServiceRuntime;
+import org.osgi.service.jaxrs.client.SseEventSourceFactory;
 import org.osgi.service.jaxrs.runtime.JaxrsServiceRuntime;
 import org.osgi.util.tracker.ServiceTracker;
 
@@ -60,6 +60,11 @@ public class TestHelper {
 
         _clientBuilderTracker.open();
 
+        _sseEventSourceFactoryTracker = new ServiceTracker<>(
+            bundleContext, SseEventSourceFactory.class, null);
+
+        _sseEventSourceFactoryTracker.open();
+
         _runtimeTracker = new ServiceTracker<>(
             bundleContext, JaxrsServiceRuntime.class, null);
 
@@ -75,6 +80,9 @@ public class TestHelper {
         _runtimeServiceReference = _runtimeTracker.getServiceReference();
     }
 
+    private ServiceTracker<SseEventSourceFactory, SseEventSourceFactory>
+        _sseEventSourceFactoryTracker;
+
     @SuppressWarnings("unchecked")
        private static String[] canonicalize(Object propertyValue) {
         if (propertyValue == null) {
@@ -103,6 +111,15 @@ public class TestHelper {
         }
     }
 
+    protected SseEventSourceFactory createSseFactory() {
+        try {
+            return _sseEventSourceFactoryTracker.waitForService(5000);
+        }
+        catch (InterruptedException ie) {
+            throw new RuntimeException(ie);
+        }
+    }
+
     protected WebTarget createDefaultTarget() {
         Client client = createClient();
 

http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/67a4cf16/jax-rs.itests/src/main/java/test/types/TestSSEApplication.java
----------------------------------------------------------------------
diff --git a/jax-rs.itests/src/main/java/test/types/TestSSEApplication.java 
b/jax-rs.itests/src/main/java/test/types/TestSSEApplication.java
new file mode 100644
index 0000000..56160ae
--- /dev/null
+++ b/jax-rs.itests/src/main/java/test/types/TestSSEApplication.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.types;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseBroadcaster;
+import javax.ws.rs.sse.SseEventSink;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+
+public class TestSSEApplication extends Application {
+
+    @Override
+    public Set<Object> getSingletons() {
+        return Collections.singleton(new SSEResource());
+    }
+
+    public static class SSEResource {
+
+        @Produces(MediaType.SERVER_SENT_EVENTS)
+        @GET
+        @Path("/subscribe")
+        public void subscribe(@Context SseEventSink sink) {
+            sink.send(_sse.newEvent("welcome"));
+
+            _sseBroadcaster.register(sink);
+        }
+
+        @POST
+        @Path("/broadcast")
+        public void broadcast(String body)
+            throws ExecutionException, InterruptedException {
+
+            CompletionStage<?> broadcast = _sseBroadcaster.broadcast(
+                _sse.newEvent(body));
+
+            broadcast.toCompletableFuture().get();
+        }
+
+        @Context
+        public void setSse(Sse sse) {
+            _sse = sse;
+            _sseBroadcaster = _sse.newBroadcaster();
+        }
+
+        private Sse _sse;
+        private SseBroadcaster _sseBroadcaster;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/67a4cf16/jax-rs.whiteboard/bnd.bnd
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/bnd.bnd b/jax-rs.whiteboard/bnd.bnd
index 9d84e26..d4bd24d 100644
--- a/jax-rs.whiteboard/bnd.bnd
+++ b/jax-rs.whiteboard/bnd.bnd
@@ -37,6 +37,7 @@ Import-Package:\
     !org.apache.xerces.*,\
     !org.apache.xml.resolver.*,\
     !org.apache.xmlbeans.*,\
+    !org.atmosphere.*,\
     !org.codehaus.stax2.*,\
     !org.dom4j.*,\
     !org.junit.*,\
@@ -64,6 +65,7 @@ Import-Package:\
     lib/cxf-rt-frontend-jaxrs.jar=cxf-rt-frontend-jaxrs-*.jar;lib:=true,\
     lib/cxf-rt-rs-client.jar=cxf-rt-rs-client-*.jar;lib:=true,\
     
lib/cxf-rt-rs-extension-providers.jar=cxf-rt-rs-extension-providers-*.jar;lib:=true,\
+    lib/cxf-rt-rs-sse.jar=cxf-rt-rs-sse-*.jar;lib:=true,\
     lib/cxf-rt-transports-http.jar=cxf-rt-transports-http-*.jar;lib:=true,\
     lib/cxf-tools-common.jar=cxf-tools-common-*.jar;lib:=true,\
     lib/jettison.jar=jettison-*.jar;lib:=true,\

http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/67a4cf16/jax-rs.whiteboard/pom.xml
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/pom.xml b/jax-rs.whiteboard/pom.xml
index d73148e..6c2cf4d 100644
--- a/jax-rs.whiteboard/pom.xml
+++ b/jax-rs.whiteboard/pom.xml
@@ -92,6 +92,12 @@
         </dependency>
         <dependency>
             <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-rs-sse</artifactId>
+            <version>${cxf.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
             <artifactId>cxf-tools-common</artifactId>
             <version>${cxf.version}</version>
             <scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/67a4cf16/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/activator/CxfJaxrsBundleActivator.java
----------------------------------------------------------------------
diff --git 
a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/activator/CxfJaxrsBundleActivator.java
 
b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/activator/CxfJaxrsBundleActivator.java
index 8632bc0..26024f5 100644
--- 
a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/activator/CxfJaxrsBundleActivator.java
+++ 
b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/activator/CxfJaxrsBundleActivator.java
@@ -24,7 +24,9 @@ import java.util.List;
 import java.util.Map;
 
 import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.ext.RuntimeDelegate;
+import javax.ws.rs.sse.SseEventSource;
 
 import org.apache.aries.jax.rs.whiteboard.internal.client.ClientBuilderFactory;
 import org.apache.aries.jax.rs.whiteboard.internal.utils.PropertyHolder;
@@ -35,6 +37,7 @@ import org.osgi.framework.BundleActivator;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.Constants;
 import org.osgi.service.http.runtime.HttpServiceRuntime;
+import org.osgi.service.jaxrs.client.SseEventSourceFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -83,6 +86,7 @@ public class CxfJaxrsBundleActivator implements 
BundleActivator {
         _defaultOSGiResult =
             all(
                 ignore(registerClient()),
+                ignore(registerSseEventSourceFactory()),
                 ignore(runWhiteboard(bundleContext, defaultConfiguration))
             )
         .run(bundleContext);
@@ -106,9 +110,9 @@ public class CxfJaxrsBundleActivator implements 
BundleActivator {
             _log.debug("Stopped whiteboard factory");
         }
     }
+
     private OSGiResult _defaultOSGiResult;
     private OSGiResult _whiteboardsResult;
-
     private static String endpointFilter(PropertyHolder configuration ) {
 
         Object whiteBoardTargetProperty = configuration.get(
@@ -162,4 +166,46 @@ public class CxfJaxrsBundleActivator implements 
BundleActivator {
             (Map<String, Object>) null);
     }
 
+    private static OSGi<?> registerSseEventSourceFactory() {
+        ClassLoader classLoader = 
CxfJaxrsBundleActivator.class.getClassLoader();
+
+        return register(
+            SseEventSourceFactory.class, new SseEventSourceFactory() {
+                @Override
+                public SseEventSource.Builder newBuilder(WebTarget target) {
+                    Thread thread = Thread.currentThread();
+
+                    ClassLoader contextClassLoader =
+                        thread.getContextClassLoader();
+
+                    thread.setContextClassLoader(classLoader);
+
+                    try {
+                        return SseEventSource.target(target);
+                    }
+                    finally {
+                        thread.setContextClassLoader(contextClassLoader);
+                    }
+                }
+
+                @Override
+                public SseEventSource newSource(WebTarget target) {
+                    Thread thread = Thread.currentThread();
+
+                    ClassLoader contextClassLoader =
+                        thread.getContextClassLoader();
+
+                    thread.setContextClassLoader(classLoader);
+
+                    try {
+                        return SseEventSource.target(target).build();
+                    }
+                    finally {
+                        thread.setContextClassLoader(contextClassLoader);
+                    }
+                }
+            },
+            new Hashtable<>());
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/67a4cf16/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Whiteboard.java
----------------------------------------------------------------------
diff --git 
a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Whiteboard.java
 
b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Whiteboard.java
index 349be5e..ff09549 100644
--- 
a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Whiteboard.java
+++ 
b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Whiteboard.java
@@ -25,7 +25,6 @@ import 
org.apache.aries.jax.rs.whiteboard.internal.utils.ServiceTuple;
 import org.apache.aries.osgi.functional.CachingServiceReference;
 import org.apache.aries.osgi.functional.OSGi;
 import org.apache.aries.osgi.functional.OSGiResult;
-import org.apache.commons.collections.Factory;
 import org.apache.cxf.Bus;
 import org.apache.cxf.bus.extension.ExtensionManagerBus;
 import org.apache.cxf.transport.servlet.CXFNonSpringServlet;

http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/67a4cf16/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/CxfJaxrsServiceRegistrator.java
----------------------------------------------------------------------
diff --git 
a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/CxfJaxrsServiceRegistrator.java
 
b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/CxfJaxrsServiceRegistrator.java
index 10715e8..e27a9bd 100644
--- 
a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/CxfJaxrsServiceRegistrator.java
+++ 
b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/CxfJaxrsServiceRegistrator.java
@@ -40,6 +40,8 @@ import javax.ws.rs.core.Feature;
 import javax.ws.rs.core.FeatureContext;
 import javax.ws.rs.ext.RuntimeDelegate;
 
+import org.apache.aries.jax.rs.whiteboard.internal.cxf.sse.SseContextProvider;
+import 
org.apache.aries.jax.rs.whiteboard.internal.cxf.sse.SseEventSinkContextProvider;
 import 
org.apache.aries.jax.rs.whiteboard.internal.utils.ServiceReferenceResourceProvider;
 import org.apache.aries.jax.rs.whiteboard.internal.utils.ServiceTuple;
 import org.apache.aries.osgi.functional.CachingServiceReference;
@@ -255,6 +257,9 @@ public class CxfJaxrsServiceRegistrator {
                     classesWithPriorities));
         }
 
+        _jaxRsServerFactoryBean.setProvider(new SseEventSinkContextProvider());
+        _jaxRsServerFactoryBean.setProvider(new SseContextProvider());
+
         for (ResourceProvider resourceProvider: _services) {
             _jaxRsServerFactoryBean.setResourceProvider(resourceProvider);
         }

http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/67a4cf16/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventBodyWriter.java
----------------------------------------------------------------------
diff --git 
a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventBodyWriter.java
 
b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventBodyWriter.java
new file mode 100644
index 0000000..b6459e6
--- /dev/null
+++ 
b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventBodyWriter.java
@@ -0,0 +1,123 @@
+package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+import java.nio.charset.StandardCharsets;
+
+import javax.ws.rs.InternalServerErrorException;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.ext.Provider;
+import javax.ws.rs.sse.OutboundSseEvent;
+
+import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+
+@Provider
+public class OutboundSseEventBodyWriter implements 
MessageBodyWriter<OutboundSseEvent> {
+    public static final String SERVER_SENT_EVENTS = "text/event-stream";
+    public static final MediaType SERVER_SENT_EVENTS_TYPE = 
MediaType.valueOf(SERVER_SENT_EVENTS);
+
+    private static final byte[] COMMENT = ": 
".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] EVENT = "event: 
".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] ID = "id: ".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] RETRY = "retry: 
".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] DATA = "data: 
".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] NEW_LINE = 
"\n".getBytes(StandardCharsets.UTF_8);
+
+    private ServerProviderFactory factory;
+    private Message message;
+
+    protected OutboundSseEventBodyWriter() {
+    }
+
+    public OutboundSseEventBodyWriter(final ServerProviderFactory factory, 
final Exchange exchange) {
+        this.factory = factory;
+        this.message = new MessageImpl();
+        this.message.setExchange(exchange);
+    }
+
+
+    @Override
+    public boolean isWriteable(Class<?> cls, Type type, Annotation[] anns, 
MediaType mt) {
+        return OutboundSseEvent.class.isAssignableFrom(cls) || 
SERVER_SENT_EVENTS_TYPE.isCompatible(mt);
+    }
+
+    @Override
+    public void writeTo(OutboundSseEvent p, Class<?> cls, Type t, Annotation[] 
anns,
+            MediaType mt, MultivaluedMap<String, Object> headers, OutputStream 
os)
+                throws IOException, WebApplicationException {
+
+        if (p.getName() != null) {
+            os.write(EVENT);
+            os.write(p.getName().getBytes(StandardCharsets.UTF_8));
+            os.write(NEW_LINE);
+        }
+
+        if (p.getId() != null) {
+            os.write(ID);
+            os.write(p.getId().getBytes(StandardCharsets.UTF_8));
+            os.write(NEW_LINE);
+        }
+
+        if (p.getComment() != null) {
+            os.write(COMMENT);
+            os.write(p.getComment().getBytes(StandardCharsets.UTF_8));
+            os.write(NEW_LINE);
+        }
+
+        if (p.getReconnectDelay() > 0) {
+            os.write(RETRY);
+            
os.write(Long.toString(p.getReconnectDelay()).getBytes(StandardCharsets.UTF_8));
+            os.write(NEW_LINE);
+        }
+
+        if (p.getData() != null) {
+            Class<?> payloadClass = p.getType();
+            Type payloadType = p.getGenericType();
+            if (payloadType == null) {
+                payloadType = payloadClass;
+            }
+
+            if (payloadType == null && payloadClass == null) {
+                payloadType = Object.class;
+                payloadClass = Object.class;
+            }
+
+            os.write(DATA);
+            writePayloadTo(payloadClass, payloadType, anns, p.getMediaType(), 
headers, p.getData(), os);
+            os.write(NEW_LINE);
+        }
+
+        os.write(NEW_LINE);
+    }
+
+    @SuppressWarnings("unchecked")
+    private<T> void writePayloadTo(Class<T> cls, Type type, Annotation[] anns, 
MediaType mt,
+            MultivaluedMap<String, Object> headers, Object data, OutputStream 
os)
+                throws IOException, WebApplicationException {
+
+        MessageBodyWriter<T> writer = null;
+        if (message != null && factory != null) {
+            writer = factory.createMessageBodyWriter(cls, type, anns, mt, 
message);
+        }
+
+        if (writer == null) {
+            throw new InternalServerErrorException("No suitable message body 
writer for class: " + cls.getName());
+        }
+
+        writer.writeTo((T)data, cls, type, anns, mt, headers, os);
+    }
+
+    @Override
+    public long getSize(OutboundSseEvent t, Class<?> type, Type genericType, 
Annotation[] annotations,
+            MediaType mediaType) {
+        return -1;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/67a4cf16/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventImpl.java
----------------------------------------------------------------------
diff --git 
a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventImpl.java
 
b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventImpl.java
new file mode 100644
index 0000000..321afbc
--- /dev/null
+++ 
b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventImpl.java
@@ -0,0 +1,164 @@
+package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
+
+import java.lang.reflect.Type;
+
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.sse.OutboundSseEvent;
+
+public final class OutboundSseEventImpl implements OutboundSseEvent {
+    private final String id;
+    private final String name;
+    private final String comment;
+    private final long reconnectDelay;
+    private final Class<?> type;
+    private final Type genericType;
+    private final MediaType mediaType;
+    private final Object data;
+
+    public static class BuilderImpl implements Builder {
+        private String id;
+        private String name;
+        private String comment;
+        private long reconnectDelay = -1;
+        private Class<?> type;
+        private Type genericType;
+        private MediaType mediaType = MediaType.TEXT_PLAIN_TYPE;
+        private Object data;
+
+        @Override
+        public Builder id(String newId) {
+            this.id = newId;
+            return this;
+        }
+
+        @Override
+        public Builder name(String newName) {
+            this.name = newName;
+            return this;
+        }
+
+        @Override
+        public Builder reconnectDelay(long milliseconds) {
+            this.reconnectDelay = milliseconds;
+            return this;
+        }
+
+        @Override
+        public Builder mediaType(MediaType newMediaType) {
+            this.mediaType = newMediaType;
+            return this;
+        }
+
+        @Override
+        public Builder comment(String newComment) {
+            this.comment = newComment;
+            return this;
+        }
+
+        @Override
+        @SuppressWarnings("rawtypes")
+        public Builder data(Class newType, Object newData) {
+            if (newType == null || newData == null) {
+                throw new IllegalArgumentException("Parameters 'type' and 
'data' must not be null.");
+            }
+            this.type = newType;
+            this.data = newData;
+            return this;
+        }
+
+        @Override
+        @SuppressWarnings("rawtypes")
+        public Builder data(GenericType newType, Object newData) {
+            if (newType == null || newData == null) {
+                throw new IllegalArgumentException("Parameters 'type' and 
'data' must not be null.");
+            }
+            this.genericType = newType.getType();
+            this.data = newData;
+            return this;
+        }
+
+        @Override
+        public Builder data(Object newData) {
+            if (newData == null) {
+                throw new IllegalArgumentException("Parameter 'data' must not 
be null.");
+            }
+            this.type = newData.getClass();
+            this.data = newData;
+            return this;
+        }
+
+        @Override
+        public OutboundSseEvent build() {
+            return new OutboundSseEventImpl(
+                id,
+                name,
+                comment,
+                reconnectDelay,
+                type,
+                genericType,
+                mediaType,
+                data
+            );
+        }
+
+    }
+    //CHECKSTYLE:OFF
+    private OutboundSseEventImpl(String id, String name, String comment, long 
reconnectDelay,
+            Class<?> type, Type genericType, MediaType mediaType, Object data) 
{
+        this.id = id;
+        this.name = name;
+        this.comment = comment;
+        this.reconnectDelay = reconnectDelay;
+        this.type = type;
+        this.genericType = genericType;
+        this.mediaType = mediaType;
+        this.data = data;
+    }
+    //CHECKSTYLE:ON
+
+    @Override
+    public String getId() {
+        return id;
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public String getComment() {
+        return comment;
+    }
+
+    @Override
+    public long getReconnectDelay() {
+        return reconnectDelay;
+    }
+
+    @Override
+    public boolean isReconnectDelaySet() {
+        return reconnectDelay != -1;
+    }
+
+    @Override
+    public Class<?> getType() {
+        return type;
+    }
+
+    @Override
+    public Type getGenericType() {
+        return genericType;
+    }
+
+    @Override
+    public MediaType getMediaType() {
+        return mediaType;
+    }
+
+    @Override
+    public Object getData() {
+        return data;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/67a4cf16/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseBroadcasterImpl.java
----------------------------------------------------------------------
diff --git 
a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseBroadcasterImpl.java
 
b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseBroadcasterImpl.java
new file mode 100644
index 0000000..07249b0
--- /dev/null
+++ 
b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseBroadcasterImpl.java
@@ -0,0 +1,105 @@
+package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.AsyncEvent;
+import javax.servlet.AsyncListener;
+import javax.ws.rs.sse.OutboundSseEvent;
+import javax.ws.rs.sse.SseBroadcaster;
+import javax.ws.rs.sse.SseEventSink;
+
+public class SseBroadcasterImpl implements SseBroadcaster {
+    private final Set<SseEventSink> subscribers = new CopyOnWriteArraySet<>();
+
+    private final Set<Consumer<SseEventSink>> closers =
+            new CopyOnWriteArraySet<>();
+
+    private final Set<BiConsumer<SseEventSink, Throwable>> exceptioners =
+            new CopyOnWriteArraySet<>();
+
+    @Override
+    public void register(SseEventSink sink) {
+        if (closed) throw new IllegalStateException("Already closed");
+
+        SseEventSinkImpl sinkImpl = (SseEventSinkImpl)sink;
+
+        AsyncContext asyncContext = sinkImpl.getAsyncContext();
+
+        asyncContext.addListener(new AsyncListener() {
+            @Override
+            public void onComplete(AsyncEvent asyncEvent) throws IOException {
+                subscribers.remove(sink);
+            }
+
+            @Override
+            public void onTimeout(AsyncEvent asyncEvent) throws IOException {
+                subscribers.remove(sink);
+            }
+
+            @Override
+            public void onError(AsyncEvent asyncEvent) throws IOException {
+                subscribers.remove(sink);
+            }
+
+            @Override
+            public void onStartAsync(AsyncEvent asyncEvent) throws IOException 
{
+
+            }
+        });
+
+        subscribers.add(sink);
+    }
+
+    @Override
+    public CompletionStage<?> broadcast(OutboundSseEvent event) {
+        if (closed) throw new IllegalStateException("Already closed");
+
+        final Collection<CompletableFuture<?>> futures = new ArrayList<>();
+        
+        for (SseEventSink sink: subscribers) {
+            try {
+                futures.add(sink.send(event).toCompletableFuture());
+            } catch (final Exception ex) {
+                exceptioners.forEach(
+                    exceptioner -> exceptioner.accept(sink, ex));
+            }
+        }
+        
+        return CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[futures.size()]));
+    }
+
+    @Override
+    public void onClose(Consumer<SseEventSink> subscriber) {
+        if (closed) throw new IllegalStateException("Already closed");
+
+        closers.add(subscriber);
+    }
+
+    @Override
+    public void onError(BiConsumer<SseEventSink, Throwable> exceptioner) {
+        if (closed) throw new IllegalStateException("Already closed");
+
+        exceptioners.add(exceptioner);
+    }
+
+    @Override
+    public void close() {
+        closed = true;
+
+        subscribers.forEach(subscriber -> {
+            subscriber.close();
+            closers.forEach(closer -> closer.accept(subscriber));
+        });
+    }
+
+    private volatile boolean closed;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/67a4cf16/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseContextProvider.java
----------------------------------------------------------------------
diff --git 
a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseContextProvider.java
 
b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseContextProvider.java
new file mode 100644
index 0000000..3760769
--- /dev/null
+++ 
b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseContextProvider.java
@@ -0,0 +1,13 @@
+package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
+
+import javax.ws.rs.sse.Sse;
+
+import org.apache.cxf.jaxrs.ext.ContextProvider;
+import org.apache.cxf.message.Message;
+
+public class SseContextProvider implements ContextProvider<Sse> {
+    @Override
+    public Sse createContext(Message message) {
+        return new SseImpl();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/67a4cf16/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkContextProvider.java
----------------------------------------------------------------------
diff --git 
a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkContextProvider.java
 
b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkContextProvider.java
new file mode 100644
index 0000000..1ae629f
--- /dev/null
+++ 
b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkContextProvider.java
@@ -0,0 +1,81 @@
+package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.sse.OutboundSseEvent;
+import javax.ws.rs.sse.SseEventSink;
+
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.interceptor.Interceptor;
+import org.apache.cxf.jaxrs.ext.ContextProvider;
+import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.Phase;
+import org.apache.cxf.phase.PhaseInterceptor;
+import org.apache.cxf.transport.http.AbstractHTTPDestination;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+
+public class SseEventSinkContextProvider implements 
ContextProvider<SseEventSink> {
+
+    @Override
+    public SseEventSink createContext(Message message) {
+        final HttpServletRequest request = 
(HttpServletRequest)message.get(AbstractHTTPDestination.HTTP_REQUEST);
+        if (request == null) {
+            throw new IllegalStateException("Unable to retrieve HTTP request 
from the context");
+        }
+
+        final MessageBodyWriter<OutboundSseEvent> writer = new 
OutboundSseEventBodyWriter(
+            ServerProviderFactory.getInstance(message), message.getExchange());
+
+        AsyncContext ctx = request.startAsync();
+        ctx.setTimeout(0);
+
+        message.getInterceptorChain().add(new SuspendPhaseInterceptor());
+
+        return new SseEventSinkImpl(writer, ctx);
+    }
+
+    private static class SuspendPhaseInterceptor
+        implements PhaseInterceptor<Message> {
+
+        @Override
+        public Set<String> getAfter() {
+            return Collections.emptySet();
+        }
+
+        @Override
+        public Set<String> getBefore() {
+            return Collections.singleton(
+                "org.apache.cxf.interceptor.OutgoingChainInterceptor");
+        }
+
+        @Override
+        public String getId() {
+            return "SSE SUSPEND";
+        }
+
+        @Override
+        public String getPhase() {
+            return Phase.POST_INVOKE;
+        }
+
+        @Override
+        public Collection<PhaseInterceptor<? extends Message>> 
getAdditionalInterceptors() {
+            return Collections.emptySet();
+        }
+
+        @Override
+        public void handleMessage(Message message) throws Fault {
+            message.getInterceptorChain().suspend();
+        }
+
+        @Override
+        public void handleFault(Message message) {
+        }
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/67a4cf16/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkImpl.java
----------------------------------------------------------------------
diff --git 
a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkImpl.java
 
b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkImpl.java
new file mode 100644
index 0000000..c14f193
--- /dev/null
+++ 
b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkImpl.java
@@ -0,0 +1,129 @@
+package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
+
+import java.lang.annotation.Annotation;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.logging.Logger;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.sse.OutboundSseEvent;
+import javax.ws.rs.sse.SseEventSink;
+
+import org.apache.cxf.common.logging.LogUtils;
+
+public class SseEventSinkImpl implements SseEventSink {
+    private static final Logger LOG = 
LogUtils.getL7dLogger(SseEventSinkImpl.class);
+
+    public AsyncContext getAsyncContext() {
+        return ctx;
+    }
+
+    private final AsyncContext ctx;
+
+    private static class QueuedEvent {
+               final OutboundSseEvent event;
+               final CompletableFuture<?> completion;
+               
+               public QueuedEvent(OutboundSseEvent event, CompletableFuture<?> 
completion) {
+                       this.event = event;
+                       this.completion = completion;
+               }
+    }
+    
+    private final MessageBodyWriter<OutboundSseEvent> writer;
+    private final Queue<QueuedEvent> queuedEvents;
+    private boolean dequeueing;
+    
+    private volatile boolean closed;
+
+    public SseEventSinkImpl(final MessageBodyWriter<OutboundSseEvent> writer,
+            final AsyncContext ctx) {
+        this.writer = writer;
+        this.queuedEvents = new LinkedList<>();
+        this.ctx = ctx;
+
+        if (ctx == null) {
+            throw new IllegalStateException("Unable to retrieve the 
AsyncContext for this request. "
+                    + "Is the Servlet configured properly?");
+        }
+        
+        
ctx.getResponse().setContentType(OutboundSseEventBodyWriter.SERVER_SENT_EVENTS);
+    }
+
+    @Override
+    public void close() {
+        if (!closed) {
+            closed = true;
+
+            try {
+                ctx.complete();
+            } catch (final Exception ex) {
+                LOG.warning("Failed to close the AsyncContext cleanly: "
+                    + ex.getMessage());
+            }
+        }
+    }
+
+    @Override
+    public CompletionStage<?> send(OutboundSseEvent event) {
+        final CompletableFuture<?> future = new CompletableFuture<>();
+        
+        if (!closed && writer != null) {
+                   
+            boolean startDequeue;
+            synchronized (this) {
+                queuedEvents.offer(new QueuedEvent(event, future));
+                if(dequeueing) {
+                    startDequeue = false;
+                } else {
+                    startDequeue = true;
+                    dequeueing = true;
+                }
+            }
+                       
+            if(startDequeue) {
+                ctx.start(this::dequeue);
+            }
+        }
+        else {
+            future.complete(null);
+        }
+
+        return future;
+    }
+    
+    private void dequeue() {
+       
+               for(;;) {
+                       QueuedEvent qe;
+                       synchronized (this) {
+                               qe = queuedEvents.poll();
+                               if(qe == null) {
+                                       dequeueing = false;
+                                       break;
+                               }
+                       }
+                       OutboundSseEvent event = qe.event;
+                       CompletableFuture<?> future = qe.completion;
+                       
+                       try {
+                writer.writeTo(event, event.getClass(), 
event.getGenericType(), new Annotation [] {}, event.getMediaType(), null, 
ctx.getResponse().getOutputStream());
+                ctx.getResponse().flushBuffer();
+                future.complete(null);
+
+                       } catch (final Exception ex) {
+                               future.completeExceptionally(ex);
+                       }
+                       
+               }
+    }
+
+    @Override
+    public boolean isClosed() {
+        return closed;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/67a4cf16/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseImpl.java
----------------------------------------------------------------------
diff --git 
a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseImpl.java
 
b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseImpl.java
new file mode 100644
index 0000000..529df07
--- /dev/null
+++ 
b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseImpl.java
@@ -0,0 +1,20 @@
+package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
+
+import javax.ws.rs.sse.OutboundSseEvent.Builder;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseBroadcaster;
+
+class SseImpl implements Sse {
+    SseImpl() {
+    }
+
+    @Override
+    public Builder newEventBuilder() {
+        return new OutboundSseEventImpl.BuilderImpl();
+    }
+
+    @Override
+    public SseBroadcaster newBroadcaster() {
+        return new SseBroadcasterImpl();
+    }
+}
\ No newline at end of file

Reply via email to