http://git-wip-us.apache.org/repos/asf/cxf/blob/23d6d663/distribution/src/main/release/samples/pom.xml ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/pom.xml b/distribution/src/main/release/samples/pom.xml index d7c5587..f9c5847 100644 --- a/distribution/src/main/release/samples/pom.xml +++ b/distribution/src/main/release/samples/pom.xml @@ -113,6 +113,7 @@ <module>jax_rs/tracing_htrace</module> <module>clustering/failover_jaxws_osgi</module> <module>clustering/failover_server</module> + <module>jax_rs/sse</module> <!-- These are removed from the build as they currently don't inherit the parent from
http://git-wip-us.apache.org/repos/asf/cxf/blob/23d6d663/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index b6d9000..594213b 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -109,7 +109,7 @@ <cxf.geronimo.transaction.version>1.1.1</cxf.geronimo.transaction.version> <cxf.jasypt.bundle.version>1.9.0_1</cxf.jasypt.bundle.version> <cxf.javassist.version>3.19.0-GA</cxf.javassist.version> - <cxf.javax.ws.rs.version>2.0.1</cxf.javax.ws.rs.version> + <cxf.javax.ws.rs.version>2.1-SNAPSHOT</cxf.javax.ws.rs.version> <cxf.jaxb.version>2.2.11</cxf.jaxb.version> <cxf.jaxb.impl.version>${cxf.jaxb.version}</cxf.jaxb.impl.version> <cxf.jaxb.core.version>${cxf.jaxb.version}</cxf.jaxb.core.version> @@ -2184,4 +2184,19 @@ </build> </profile> </profiles> + + <!-- Temporarily only till JAX-RS 2.1 artifacts become available --> + <repositories> + <repository> + <id>maven.java.net</id> + <name>java.net snapshots</name> + <url>https://maven.java.net/content/repositories/snapshots/</url> + <releases> + <enabled>false</enabled> + </releases> + <snapshots> + <enabled>true</enabled> + </snapshots> + </repository> + </repositories> </project> http://git-wip-us.apache.org/repos/asf/cxf/blob/23d6d663/rt/rs/pom.xml ---------------------------------------------------------------------- diff --git a/rt/rs/pom.xml b/rt/rs/pom.xml index a79671e..765330d 100644 --- a/rt/rs/pom.xml +++ b/rt/rs/pom.xml @@ -37,5 +37,6 @@ <module>extensions/providers</module> <module>extensions/search</module> <module>security</module> + <module>sse</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/cxf/blob/23d6d663/rt/rs/sse/pom.xml ---------------------------------------------------------------------- diff --git a/rt/rs/sse/pom.xml b/rt/rs/sse/pom.xml new file mode 100644 index 0000000..43e5c66 --- /dev/null +++ b/rt/rs/sse/pom.xml @@ -0,0 +1,70 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <artifactId>cxf-rt-rs-sse</artifactId> + <packaging>bundle</packaging> + <name>Apache CXF JAX-RS Server-Side Events Support</name> + <description>Apache CXF JAX-RS Server-Side Events Support</description> + <url>http://cxf.apache.org</url> + <parent> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-parent</artifactId> + <version>3.2.0-SNAPSHOT</version> + <relativePath>../../../parent/pom.xml</relativePath> + </parent> + <properties> + <cxf.osgi.import> + javax.servlet*;version="${cxf.osgi.javax.servlet.version}", + </cxf.osgi.import> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-rt-frontend-jaxrs</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${cxf.servlet-api.group}</groupId> + <artifactId>${cxf.servlet-api.artifact}</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.atmosphere</groupId> + <artifactId>atmosphere-runtime</artifactId> + <version>${cxf.atmosphere.version}</version> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <artifactId>maven-checkstyle-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/cxf/blob/23d6d663/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java new file mode 100644 index 0000000..4a9b3aa --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse; + +import java.io.IOException; +import java.io.OutputStream; +import java.lang.annotation.Annotation; +import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; + +import javax.ws.rs.InternalServerErrorException; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.ext.MessageBodyWriter; +import javax.ws.rs.ext.Provider; +import javax.ws.rs.sse.OutboundSseEvent; + +import org.apache.cxf.jaxrs.provider.ServerProviderFactory; +import org.apache.cxf.message.Exchange; +import org.apache.cxf.message.Message; +import org.apache.cxf.message.MessageImpl; + +@Provider +public class OutboundSseEventBodyWriter implements MessageBodyWriter<OutboundSseEvent> { + public static final String SERVER_SENT_EVENTS = "text/event-stream"; + public static final MediaType SERVER_SENT_EVENTS_TYPE = MediaType.valueOf(SERVER_SENT_EVENTS); + + private static final byte[] COMMENT = ": ".getBytes(StandardCharsets.UTF_8); + private static final byte[] EVENT = " ".getBytes(StandardCharsets.UTF_8); + private static final byte[] ID = "id: ".getBytes(StandardCharsets.UTF_8); + private static final byte[] RETRY = "retry: ".getBytes(StandardCharsets.UTF_8); + private static final byte[] DATA = "data: ".getBytes(StandardCharsets.UTF_8); + private static final byte[] NEW_LINE = "\n".getBytes(StandardCharsets.UTF_8); + + private ServerProviderFactory factory; + private Message message; + + protected OutboundSseEventBodyWriter() { + } + + public OutboundSseEventBodyWriter(final ServerProviderFactory factory, final Exchange exchange) { + this.factory = factory; + this.message = new MessageImpl(); + this.message.setExchange(exchange); + } + + + @Override + public boolean isWriteable(Class<?> cls, Type type, Annotation[] anns, MediaType mt) { + return OutboundSseEvent.class.isAssignableFrom(cls) || SERVER_SENT_EVENTS_TYPE.isCompatible(mt); + } + + @Override + public void writeTo(OutboundSseEvent p, Class<?> cls, Type t, Annotation[] anns, + MediaType mt, MultivaluedMap<String, Object> headers, OutputStream os) + throws IOException, WebApplicationException { + + if (p.getName() != null) { + os.write(EVENT); + os.write(p.getName().getBytes(StandardCharsets.UTF_8)); + os.write(NEW_LINE); + } + + if (p.getId() != null) { + os.write(ID); + os.write(p.getId().getBytes(StandardCharsets.UTF_8)); + os.write(NEW_LINE); + } + + if (p.getComment() != null) { + os.write(COMMENT); + os.write(p.getComment().getBytes(StandardCharsets.UTF_8)); + os.write(NEW_LINE); + } + + if (p.getReconnectDelay() > 0) { + os.write(RETRY); + os.write(Long.toString(p.getReconnectDelay()).getBytes(StandardCharsets.UTF_8)); + os.write(NEW_LINE); + } + + if (p.getData() != null) { + Class<?> payloadClass = p.getType(); + Type payloadType = p.getGenericType(); + if (payloadType == null) { + payloadType = payloadClass; + } + + if (payloadType == null && payloadClass == null) { + payloadType = Object.class; + payloadClass = Object.class; + } + + os.write(DATA); + writePayloadTo(payloadClass, payloadType, anns, p.getMediaType(), headers, p.getData(), os); + os.write(NEW_LINE); + } + } + + @SuppressWarnings("unchecked") + private<T> void writePayloadTo(Class<T> cls, Type type, Annotation[] anns, MediaType mt, + MultivaluedMap<String, Object> headers, Object data, OutputStream os) + throws IOException, WebApplicationException { + + MessageBodyWriter<T> writer = null; + if (message != null && factory != null) { + writer = factory.createMessageBodyWriter(cls, type, anns, mt, message); + } + + if (writer == null) { + throw new InternalServerErrorException("No suitable message body writer for class: " + cls.getName()); + } + + writer.writeTo((T)data, cls, type, anns, mt, headers, os); + } + + @Override + public long getSize(OutboundSseEvent t, Class<?> type, Type genericType, Annotation[] annotations, + MediaType mediaType) { + return -1; + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/23d6d663/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventImpl.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventImpl.java new file mode 100644 index 0000000..f852637 --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventImpl.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse; + +import java.lang.reflect.Type; + +import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.sse.OutboundSseEvent; + +public class OutboundSseEventImpl implements OutboundSseEvent { + private String id; + private String name; + private String comment; + private long reconnectDelay = -1; + private Class<?> type; + private Type genericType; + private MediaType mediaType; + private Object data; + + public static class BuilderImpl implements Builder { + private String id; + private String name; + private String comment; + private long reconnectDelay = -1; + private Class<?> type; + private Type genericType; + private MediaType mediaType; + private Object data; + + @Override + public Builder id(String id) { + this.id = id; + return this; + } + + @Override + public Builder name(String name) { + this.name = name; + return this; + } + + @Override + public Builder reconnectDelay(long milliseconds) { + this.reconnectDelay = milliseconds; + return this; + } + + @Override + public Builder mediaType(MediaType mediaType) { + this.mediaType = mediaType; + return this; + } + + @Override + public Builder comment(String comment) { + this.comment = comment; + return this; + } + + @Override + @SuppressWarnings("rawtypes") + public Builder data(Class type, Object data) { + this.type = type; + this.data= data; + return this; + } + + @Override + @SuppressWarnings("rawtypes") + public Builder data(GenericType type, Object data) { + this.genericType = type.getType(); + this.data= data; + return this; + } + + @Override + public Builder data(Object data) { + this.data = data; + return this; + } + + @Override + public OutboundSseEvent build() { + return new OutboundSseEventImpl( + id, + name, + comment, + reconnectDelay, + type, + genericType, + mediaType, + data + ); + } + + } + + OutboundSseEventImpl(String id, String name, String comment, long reconnectDelay, + Class<?> type, Type genericType, MediaType mediaType, Object data) { + this.id = id; + this.name = name; + this.comment = comment; + this.reconnectDelay = reconnectDelay; + this.type = type; + this.genericType = genericType; + this.mediaType = mediaType; + this.data = data; + } + + @Override + public String getId() { + return id; + } + + @Override + public String getName() { + return name; + } + + @Override + public String getComment() { + return comment; + } + + @Override + public long getReconnectDelay() { + return reconnectDelay; + } + + @Override + public boolean isReconnectDelaySet() { + return reconnectDelay != -1; + } + + @Override + public Class<?> getType() { + return type; + } + + @Override + public Type getGenericType() { + return genericType; + } + + @Override + public MediaType getMediaType() { + return mediaType; + } + + @Override + public Object getData() { + return data; + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/23d6d663/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java new file mode 100644 index 0000000..977a6b2 --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse; + +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; + +import javax.ws.rs.sse.OutboundSseEvent; +import javax.ws.rs.sse.SseBroadcaster; +import javax.ws.rs.sse.SseEventOutput; + +public class SseBroadcasterImpl implements SseBroadcaster { + private final Set<SseEventOutput> outputs = new CopyOnWriteArraySet<>(); + private final Set<Listener> listeners = new CopyOnWriteArraySet<>(); + + @Override + public boolean register(Listener listener) { + return listeners.add(listener); + } + + @Override + public boolean register(SseEventOutput output) { + return outputs.add(output); + } + + @Override + public void broadcast(OutboundSseEvent event) { + for (final SseEventOutput output: outputs) { + try { + output.write(event); + } catch (final IOException ex) { + listeners.forEach(listener -> listener.onException(output, ex)); + } + } + } + + @Override + public void close() { + for (final SseEventOutput output: outputs) { + try { + output.close(); + listeners.forEach(listener -> listener.onClose(output)); + } catch (final IOException ex) { + listeners.forEach(listener -> listener.onException(output, ex)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/23d6d663/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventOutputProvider.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventOutputProvider.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventOutputProvider.java new file mode 100644 index 0000000..7f7963f --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventOutputProvider.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse; + +import java.io.IOException; +import java.io.OutputStream; +import java.lang.annotation.Annotation; +import java.lang.reflect.Type; + +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.ext.MessageBodyWriter; +import javax.ws.rs.ext.Provider; +import javax.ws.rs.sse.SseEventOutput; + +@Provider +public class SseEventOutputProvider implements MessageBodyWriter<SseEventOutput> { + @Override + public boolean isWriteable(Class<?> cls, Type type, Annotation[] anns, MediaType mt) { + return SseEventOutput.class.isAssignableFrom(cls); + } + + @Override + public long getSize(final SseEventOutput output, final Class<?> type, final Type genericType, + final Annotation[] annotations, final MediaType mediaType) { + return -1; + } + + @Override + public void writeTo(final SseEventOutput output, final Class<?> type, final Type genericType, + final Annotation[] annotations, final MediaType mediaType, + final MultivaluedMap<String, Object> httpHeaders, final OutputStream entityStream) + throws IOException, WebApplicationException { + // do nothing. + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/23d6d663/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java new file mode 100644 index 0000000..da682a0 --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.cxf.Bus; +import org.apache.cxf.endpoint.Server; +import org.apache.cxf.feature.AbstractFeature; +import org.apache.cxf.jaxrs.provider.ServerProviderFactory; +import org.apache.cxf.jaxrs.sse.atmosphere.SseAtmosphereContextProvider; + +public class SseFeature extends AbstractFeature { + @Override + public void initialize(Server server, Bus bus) { + final List<Object> providers = new ArrayList<>(); + + providers.add(new SseAtmosphereContextProvider()); + providers.add(new SseEventOutputProvider()); + + ((ServerProviderFactory) server.getEndpoint().get( + ServerProviderFactory.class.getName())).setUserProviders(providers); + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/23d6d663/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereContextProvider.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereContextProvider.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereContextProvider.java new file mode 100644 index 0000000..de2c3a9 --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereContextProvider.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse.atmosphere; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.ext.Provider; +import javax.ws.rs.sse.SseContext; + +import org.apache.cxf.jaxrs.ext.ContextProvider; +import org.apache.cxf.jaxrs.provider.ServerProviderFactory; +import org.apache.cxf.message.Message; +import org.apache.cxf.transport.http.AbstractHTTPDestination; +import org.atmosphere.cpr.AtmosphereResource; +import org.atmosphere.cpr.Broadcaster; + +@Provider +public class SseAtmosphereContextProvider implements ContextProvider<SseContext> { + @Override + public SseContext createContext(Message message) { + final HttpServletRequest request = (HttpServletRequest)message.get(AbstractHTTPDestination.HTTP_REQUEST); + if (request == null) { + throw new IllegalStateException("Unable to retrieve HTTP request from the context"); + } + + final AtmosphereResource resource = (AtmosphereResource)request + .getAttribute(AtmosphereResource.class.getName()); + if (resource == null) { + throw new IllegalStateException("AtmosphereResource is not present, " + + "is AtmosphereServlet configured properly?"); + } + + final Broadcaster broadcaster = resource.getAtmosphereConfig() + .getBroadcasterFactory() + .lookup(resource.uuid(), true); + + resource.removeFromAllBroadcasters(); + resource.setBroadcaster(broadcaster); + + return new SseAtmosphereResourceContext(ServerProviderFactory.getInstance(message), resource); + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/23d6d663/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventOutputImpl.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventOutputImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventOutputImpl.java new file mode 100644 index 0000000..dbf15ad --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventOutputImpl.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse.atmosphere; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.lang.annotation.Annotation; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.ws.rs.ext.MessageBodyWriter; +import javax.ws.rs.sse.OutboundSseEvent; +import javax.ws.rs.sse.SseEventOutput; + +import org.atmosphere.cpr.AtmosphereResource; +import org.atmosphere.cpr.Broadcaster; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SseAtmosphereEventOutputImpl implements SseEventOutput { + private static final Logger LOGGER = LoggerFactory.getLogger(SseAtmosphereEventOutputImpl.class); + + private final AtmosphereResource resource; + private final MessageBodyWriter<OutboundSseEvent> writer; + private volatile boolean closed = false; + + public SseAtmosphereEventOutputImpl(final MessageBodyWriter<OutboundSseEvent> writer, + final AtmosphereResource resource) { + this.writer = writer; + this.resource = resource; + + if (!resource.isSuspended()) { + resource.suspend(); + } + } + + @Override + public void close() throws IOException { + if (!closed) { + closed = true; + + if (resource.isSuspended()) { + resource.resume(); + } + + final Broadcaster broadcaster = resource.getBroadcaster(); + resource.removeFromAllBroadcasters(); + + try { + if (!resource.getResponse().isCommitted()) { + resource.getResponse().flushBuffer(); + } + } finally { + resource.close(); + broadcaster.destroy(); + } + } + } + + @Override + public void write(OutboundSseEvent event) throws IOException { + if (!closed && writer != null) { + try (final ByteArrayOutputStream os = new ByteArrayOutputStream()) { + writer.writeTo(event, event.getClass(), null, new Annotation [] {}, event.getMediaType(), null, os); + + // Atmosphere broadcasts asynchronously which is acceptable in most cases. + // Unfortunately, calling close() may lead to response stream being closed + // while there are still some SSE delivery scheduled. + final Future<Object> future = resource + .getBroadcaster() + .broadcast(os.toString(StandardCharsets.UTF_8.name())); + + try { + if (!future.isDone()) { + // Let us wait at least 200 milliseconds before returning to ensure + // that SSE had the opportunity to be delivered. + future.get(200, TimeUnit.MILLISECONDS); + } + } catch (final ExecutionException | InterruptedException ex) { + throw new IOException(ex); + } catch (final TimeoutException ex) { + LOGGER.warn("SSE was not delivered within default timeout"); + } + } + } + } + + @Override + public boolean isClosed() { + return closed; + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/23d6d663/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java new file mode 100644 index 0000000..3b91c83 --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse.atmosphere; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintWriter; + +import org.atmosphere.cpr.Action; +import org.atmosphere.cpr.AsyncIOInterceptorAdapter; +import org.atmosphere.cpr.AsyncIOWriter; +import org.atmosphere.cpr.AtmosphereInterceptorWriter; +import org.atmosphere.cpr.AtmosphereRequest; +import org.atmosphere.cpr.AtmosphereResource; +import org.atmosphere.cpr.AtmosphereResourceEvent; +import org.atmosphere.cpr.AtmosphereResourceEventListenerAdapter.OnPreSuspend; +import org.atmosphere.cpr.AtmosphereResponse; +import org.atmosphere.interceptor.AllowInterceptor; +import org.atmosphere.interceptor.SSEAtmosphereInterceptor; +import org.atmosphere.util.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.cxf.jaxrs.sse.OutboundSseEventBodyWriter.SERVER_SENT_EVENTS; +import static org.atmosphere.cpr.ApplicationConfig.PROPERTY_USE_STREAM; +import static org.atmosphere.cpr.FrameworkConfig.CALLBACK_JAVASCRIPT_PROTOCOL; +import static org.atmosphere.cpr.FrameworkConfig.CONTAINER_RESPONSE; + +/** + * Most of this class implementation is borrowed from SSEAtmosphereInterceptor. The original + * implementation does two things which do not fit well into SSE support: + * - closes the response stream (overridden by SseAtmosphereInterceptorWriter) + * - wraps the whatever object is being written to SSE payload (overridden using + * the complete SSE protocol) + */ +public class SseAtmosphereInterceptor extends SSEAtmosphereInterceptor { + private static final Logger LOGGER = LoggerFactory.getLogger(SseAtmosphereInterceptor.class); + + private static final byte[] PADDING; + private static final String PADDING_TEXT; + private static final byte[] END = "\r\n\r\n".getBytes(); + + static { + StringBuffer whitespace = new StringBuffer(); + for (int i = 0; i < 2000; i++) { + whitespace.append(" "); + } + whitespace.append("\n"); + PADDING_TEXT = whitespace.toString(); + PADDING = PADDING_TEXT.getBytes(); + } + + private boolean writePadding(AtmosphereResponse response) { + if (response.request() != null && response.request().getAttribute("paddingWritten") != null) { + return false; + } + + response.setContentType(SERVER_SENT_EVENTS); + response.setCharacterEncoding("utf-8"); + boolean isUsingStream = (Boolean) response.request().getAttribute(PROPERTY_USE_STREAM); + if (isUsingStream) { + try { + OutputStream stream = response.getResponse().getOutputStream(); + try { + stream.write(PADDING); + stream.flush(); + } catch (IOException ex) { + LOGGER.warn("SSE may not work", ex); + } + } catch (IOException e) { + LOGGER.trace("", e); + } + } else { + try { + PrintWriter w = response.getResponse().getWriter(); + w.println(PADDING_TEXT); + w.flush(); + } catch (IOException e) { + LOGGER.trace("", e); + } + } + response.resource().getRequest().setAttribute("paddingWritten", "true"); + return true; + } + + @Override + public Action inspect(final AtmosphereResource r) { + if (Utils.webSocketMessage(r)) { + return Action.CONTINUE; + } + + final AtmosphereRequest request = r.getRequest(); + final String accept = request.getHeader("Accept") == null ? "text/plain" : request.getHeader("Accept").trim(); + + if (r.transport().equals(AtmosphereResource.TRANSPORT.SSE) || SERVER_SENT_EVENTS.equalsIgnoreCase(accept)) { + final AtmosphereResponse response = r.getResponse(); + if (response.getAsyncIOWriter() == null) { + response.asyncIOWriter(new SseAtmosphereInterceptorWriter()); + } + + r.addEventListener(new P(response)); + + AsyncIOWriter writer = response.getAsyncIOWriter(); + if (AtmosphereInterceptorWriter.class.isAssignableFrom(writer.getClass())) { + AtmosphereInterceptorWriter.class.cast(writer).interceptor(new AsyncIOInterceptorAdapter() { + private boolean padding() { + if (!r.isSuspended()) { + return writePadding(response); + } + return false; + } + + @Override + public void prePayload(AtmosphereResponse response, byte[] data, int offset, int length) { + padding(); + } + + @Override + public void postPayload(AtmosphereResponse response, byte[] data, int offset, int length) { + // The CALLBACK_JAVASCRIPT_PROTOCOL may be called by a framework running on top of Atmosphere + // In that case, we must pad/protocol indenendently of the state of the AtmosphereResource + if (r.isSuspended() || r.getRequest().getAttribute(CALLBACK_JAVASCRIPT_PROTOCOL) != null + || r.getRequest().getAttribute(CONTAINER_RESPONSE) != null) { + response.write(END, true); + } + + /** + * When used with https://github.com/remy/polyfills/blob/master/EventSource.js , we + * resume after every message. + */ + String ua = r.getRequest().getHeader("User-Agent"); + if (ua != null && ua.contains("MSIE")) { + try { + response.flushBuffer(); + } catch (IOException e) { + LOGGER.trace("", e); + } + r.resume(); + } + } + }); + } else { + LOGGER.warn("Unable to apply {}. Your AsyncIOWriter must implement {}", + getClass().getName(), AtmosphereInterceptorWriter.class.getName()); + } + } + + return Action.CONTINUE; + } + + private final class P extends OnPreSuspend implements AllowInterceptor { + + private final AtmosphereResponse response; + + private P(AtmosphereResponse response) { + this.response = response; + } + + @Override + public void onPreSuspend(AtmosphereResourceEvent event) { + writePadding(response); + } + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/23d6d663/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java new file mode 100644 index 0000000..24ebfd9 --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse.atmosphere; + +import java.io.IOException; + +import org.atmosphere.cpr.AtmosphereInterceptorWriter; +import org.atmosphere.cpr.AtmosphereResponse; + +public class SseAtmosphereInterceptorWriter extends AtmosphereInterceptorWriter { + @Override + public void close(AtmosphereResponse response) throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/23d6d663/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereResourceContext.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereResourceContext.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereResourceContext.java new file mode 100644 index 0000000..c330d6c --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereResourceContext.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse.atmosphere; + +import javax.ws.rs.ext.MessageBodyWriter; +import javax.ws.rs.sse.OutboundSseEvent; +import javax.ws.rs.sse.OutboundSseEvent.Builder; +import javax.ws.rs.sse.SseBroadcaster; +import javax.ws.rs.sse.SseContext; +import javax.ws.rs.sse.SseEventOutput; + +import org.apache.cxf.jaxrs.provider.ServerProviderFactory; +import org.apache.cxf.jaxrs.sse.OutboundSseEventBodyWriter; +import org.apache.cxf.jaxrs.sse.OutboundSseEventImpl; +import org.apache.cxf.jaxrs.sse.SseBroadcasterImpl; +import org.apache.cxf.jaxrs.utils.JAXRSUtils; +import org.atmosphere.cpr.AtmosphereResource; + +public class SseAtmosphereResourceContext implements SseContext { + private final AtmosphereResource resource; + private final ServerProviderFactory factory; + + SseAtmosphereResourceContext(final ServerProviderFactory factory, final AtmosphereResource resource) { + this.factory = factory; + this.resource = resource; + } + + @Override + public SseEventOutput newOutput() { + final MessageBodyWriter<OutboundSseEvent> writer = new OutboundSseEventBodyWriter(factory, + JAXRSUtils.getCurrentMessage().getExchange()); + return new SseAtmosphereEventOutputImpl(writer, resource); + } + + @Override + public Builder newEvent() { + return new OutboundSseEventImpl.BuilderImpl(); + } + + @Override + public SseBroadcaster newBroadcaster() { + return new SseBroadcasterImpl(); + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/23d6d663/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/servlet/CXFSseServlet.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/servlet/CXFSseServlet.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/servlet/CXFSseServlet.java new file mode 100644 index 0000000..bc87ebf --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/servlet/CXFSseServlet.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse.servlet; + +import org.apache.cxf.jaxrs.sse.atmosphere.SseAtmosphereInterceptor; +import org.apache.cxf.transport.servlet.CXFNonSpringServlet; +import org.atmosphere.cpr.ApplicationConfig; +import org.atmosphere.cpr.AtmosphereServlet; +import org.atmosphere.handler.ReflectorServletProcessor; + +public class CXFSseServlet extends AtmosphereServlet { + private static final long serialVersionUID = -874047746532165731L; + + public CXFSseServlet(final CXFNonSpringServlet delegate) { + // Register and map the dispatcher servlet + super(true); + + framework().addAtmosphereHandler("/*", new ReflectorServletProcessor(delegate)); + framework().interceptor(new SseAtmosphereInterceptor()); + framework().addInitParameter(ApplicationConfig.PROPERTY_NATIVE_COMETSUPPORT, "true"); + framework().addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, "true"); + framework().addInitParameter(ApplicationConfig.DISABLE_ATMOSPHEREINTERCEPTOR, "true"); + framework().addInitParameter(ApplicationConfig.CLOSE_STREAM_ON_CANCEL, "true"); + } +}