This is an automated email from the ASF dual-hosted git repository. jgallimore pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tomee.git
commit 70bcfcbe40e90e0a3d194ec2d532be1b730e2353 Author: Jonathan Gallimore <[email protected]> AuthorDate: Thu Nov 26 00:35:08 2020 +0000 Adding SSE --- .../apache/openejb/config/AnnotationDeployer.java | 3 + .../org/apache/openejb/server/cxf/rs/Contexts.java | 5 + .../apache/openejb/server/cxf/rs/CxfRSService.java | 4 +- .../openejb/server/cxf/rs/CxfRsHttpListener.java | 39 ++++- .../server/cxf/rs/CDISSEApplicationTest.java | 162 +++++++++++++++++++++ 5 files changed, 211 insertions(+), 2 deletions(-) diff --git a/container/openejb-core/src/main/java/org/apache/openejb/config/AnnotationDeployer.java b/container/openejb-core/src/main/java/org/apache/openejb/config/AnnotationDeployer.java index 19efeff..fceb33e 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/config/AnnotationDeployer.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/config/AnnotationDeployer.java @@ -1157,6 +1157,9 @@ public class AnnotationDeployer implements DynamicDeployer { webModule.getRestClasses().addAll(findRestClasses(webModule, finder)); addJaxRsProviders(finder, webModule.getJaxrsProviders(), Provider.class); + // CXF actually does this in its own CDI setup - org.apache.cxf.cdi.JAXRSCdiResourceExtension#collect(javax.enterprise.inject.spi.ProcessBean<T>) + //addJaxRsProviders(finder, webModule.getJaxrsProviders(), Path.class); + // Applications with a default constructor // findSubclasses will not work by default to gain a lot of time // look FinderFactory for the flag to activate it or diff --git a/server/openejb-cxf-rs/src/main/java/org/apache/openejb/server/cxf/rs/Contexts.java b/server/openejb-cxf-rs/src/main/java/org/apache/openejb/server/cxf/rs/Contexts.java index cfd7257..03c0b58 100644 --- a/server/openejb-cxf-rs/src/main/java/org/apache/openejb/server/cxf/rs/Contexts.java +++ b/server/openejb-cxf-rs/src/main/java/org/apache/openejb/server/cxf/rs/Contexts.java @@ -47,6 +47,8 @@ import javax.ws.rs.core.SecurityContext; import javax.ws.rs.core.UriInfo; import javax.ws.rs.ext.ContextResolver; import javax.ws.rs.ext.Providers; +import javax.ws.rs.sse.Sse; +import javax.ws.rs.sse.SseEventSink; import java.lang.annotation.Annotation; import java.lang.reflect.Field; import java.util.Collection; @@ -102,6 +104,9 @@ public final class Contexts { classes.add(ServletConfig.class); classes.add(ServletContext.class); classes.add(MessageContext.class); + classes.add(Sse.class); + classes.add(SseEventSink.class); + return classes; } diff --git a/server/openejb-cxf-rs/src/main/java/org/apache/openejb/server/cxf/rs/CxfRSService.java b/server/openejb-cxf-rs/src/main/java/org/apache/openejb/server/cxf/rs/CxfRSService.java index c27fd8b..70b8c64 100644 --- a/server/openejb-cxf-rs/src/main/java/org/apache/openejb/server/cxf/rs/CxfRSService.java +++ b/server/openejb-cxf-rs/src/main/java/org/apache/openejb/server/cxf/rs/CxfRSService.java @@ -229,7 +229,9 @@ public class CxfRSService extends RESTService { if (userConfiguredJsonProviders == null) { jsonProviders = asList( "org.apache.openejb.server.cxf.rs.johnzon.TomEEJsonbProvider", - "org.apache.openejb.server.cxf.rs.johnzon.TomEEJsonpProvider"); + "org.apache.openejb.server.cxf.rs.johnzon.TomEEJsonpProvider", + "org.apache.cxf.jaxrs.sse.SseContextProvider", + "org.apache.cxf.jaxrs.sse.SseEventSinkContextProvider"); } else { jsonProviders = asList(userConfiguredJsonProviders.split(",")); } diff --git a/server/openejb-cxf-rs/src/main/java/org/apache/openejb/server/cxf/rs/CxfRsHttpListener.java b/server/openejb-cxf-rs/src/main/java/org/apache/openejb/server/cxf/rs/CxfRsHttpListener.java index 514ea51..c4f97d9 100644 --- a/server/openejb-cxf-rs/src/main/java/org/apache/openejb/server/cxf/rs/CxfRsHttpListener.java +++ b/server/openejb-cxf-rs/src/main/java/org/apache/openejb/server/cxf/rs/CxfRsHttpListener.java @@ -92,8 +92,11 @@ import org.apache.webbeans.config.WebBeansContext; import org.apache.webbeans.container.BeanManagerImpl; import org.apache.webbeans.context.creational.CreationalContextImpl; +import javax.enterprise.context.Dependent; import javax.enterprise.context.spi.CreationalContext; +import javax.enterprise.inject.InjectionException; import javax.enterprise.inject.spi.Bean; +import javax.inject.Singleton; import javax.management.ObjectName; import javax.management.openmbean.TabularData; import javax.naming.Context; @@ -109,6 +112,7 @@ import javax.validation.ValidatorFactory; import javax.validation.metadata.MethodDescriptor; import javax.ws.rs.ConstrainedTo; import javax.ws.rs.RuntimeType; +import javax.ws.rs.WebApplicationException; import javax.ws.rs.client.ClientRequestFilter; import javax.ws.rs.core.Application; import javax.ws.rs.core.Configuration; @@ -462,9 +466,11 @@ public class CxfRsHttpListener implements RsHttpListener { factory.setServiceClass(clazz); } + factory.setTransportId("http://cxf.apache.org/transports/http/sse"); + server = factory.create(); - destination = (HttpDestination) server.getDestination(); + destination = (HttpDestination) server.getDestination(); fireServerCreated(oldLoader); } finally { if (oldLoader != null) { @@ -664,6 +670,33 @@ public class CxfRsHttpListener implements RsHttpListener { final Object proxy = ProxyEJB.subclassProxy(restServiceInfo.context); factory.setResourceProvider(clazz, new NoopResourceProvider(restServiceInfo.context.getBeanClass(), proxy)); } else { + // check if its a singleton. + + if (owbCtx != null) { + final BeanManagerImpl bm = owbCtx.getBeanManagerImpl(); + Bean<?> bean = null; + + if (bm != null && bm.isInUse()) { + try { + final Set<Bean<?>> beans = bm.getBeans(clazz); + bean = bm.resolve(beans); + } catch (final InjectionException ie) { + final String msg = "Resource class " + clazz.getName() + " can not be instantiated"; + LOGGER.warning(msg, ie); + throw new WebApplicationException(Response.serverError().entity(msg).build()); + } + + if (bean != null && isConsideredSingleton(bean.getScope())) { + final Object reference = bm.getReference(bean, bean.getBeanClass(), bm.createCreationalContext(bean)); + factory.setResourceProvider(clazz, new CdiSingletonResourceProvider( + classLoader, clazz, reference, injections, context, owbCtx)); + + continue; + } + } + } + + factory.setResourceProvider(clazz, new OpenEJBPerRequestPojoResourceProvider( classLoader, clazz, injections, context, owbCtx)); } @@ -758,6 +791,10 @@ public class CxfRsHttpListener implements RsHttpListener { } } + private boolean isConsideredSingleton(final Class<?> scope) { + return Singleton.class == scope || Dependent.class == scope; + } + private void fixProviderIfKnown() { final ServerProviderFactory spf = ServerProviderFactory.class.cast(server.getEndpoint().get(ServerProviderFactory.class.getName())); for (final String field : asList("messageWriters", "messageReaders")) { diff --git a/server/openejb-cxf-rs/src/test/java/org/apache/openejb/server/cxf/rs/CDISSEApplicationTest.java b/server/openejb-cxf-rs/src/test/java/org/apache/openejb/server/cxf/rs/CDISSEApplicationTest.java new file mode 100644 index 0000000..9e350b5 --- /dev/null +++ b/server/openejb-cxf-rs/src/test/java/org/apache/openejb/server/cxf/rs/CDISSEApplicationTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.openejb.server.cxf.rs; + +import org.apache.openejb.jee.WebApp; +import org.apache.openejb.junit.ApplicationComposer; +import org.apache.openejb.testing.Classes; +import org.apache.openejb.testing.Configuration; +import org.apache.openejb.testing.EnableServices; +import org.apache.openejb.testing.Module; +import org.apache.openejb.testng.PropertiesBuilder; +import org.apache.openejb.util.NetworkUtil; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.sse.OutboundSseEvent; +import javax.ws.rs.sse.Sse; +import javax.ws.rs.sse.SseBroadcaster; +import javax.ws.rs.sse.SseEventSink; +import java.io.Serializable; +import java.util.Date; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; + +@EnableServices("jax-rs") +@RunWith(ApplicationComposer.class) +@Ignore +public class CDISSEApplicationTest { + + private static int port = -1; + + @BeforeClass + public static void beforeClass() { + port = NetworkUtil.getNextAvailablePort(); + } + + @Configuration + public Properties props() { + return new PropertiesBuilder().p("httpejbd.port", Integer.toString(port)).build(); + } + + @Module + @Classes(cdi = true, value = {MyCdiRESTApplication.class, Resource.class}) + public WebApp war() { + return new WebApp() + .contextRoot("foo") + .addServlet("REST Application", Application.class.getName()) + .addInitParam("REST Application", "javax.ws.rs.Application", MyCdiRESTApplication.class.getName()); + } + + @Test + public void isCdi() { + assertEquals("[{\"id\":1}]", ClientBuilder.newClient().target("http://localhost:" + port).path("/foo/sse").request().get(String.class)); + } + + public static class MyCdiRESTApplication extends Application { + + } + + @Path("sse") + public static class Resource { + + private SseBroadcaster broadcaster; + private OutboundSseEvent.Builder builder; + private AtomicLong eventId = new AtomicLong(); + private HttpServletRequest request; + + public Resource() { + System.out.println("Resource created"); + } + + @Context + public void setHttpRequest(final HttpServletRequest request) { + this.request = request; + } + + @Context + public void setSse(final Sse sse) { + this.broadcaster = sse.newBroadcaster(); + this.builder = sse.newEventBuilder(); + } + + @POST + @Consumes(MediaType.TEXT_PLAIN) + public void send(final String message) { + broadcaster.broadcast(createEvent(builder, eventId.incrementAndGet(), message)); + } + + @GET + @Produces(MediaType.SERVER_SENT_EVENTS) + public void stats(final @Context SseEventSink sink) { + broadcaster.register(sink); + } + + private static OutboundSseEvent createEvent(final OutboundSseEvent.Builder builder, final long eventId, final String text) { + return builder + .id("" + eventId) + .data(Message.class, new Message(new Date().getTime(), text)) + .mediaType(MediaType.APPLICATION_JSON_TYPE) + .build(); + } + } + + public static class Message implements Serializable { + private static final long serialVersionUID = -6705829915457870975L; + + private long timestamp; + private String text; + + public Message() { + } + + public Message(final long timestamp, final String text) { + this.timestamp = timestamp; + this.text = text; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(final long timestamp) { + this.timestamp = timestamp; + } + + public String getText() { + return text; + } + + public void setText(final String text) { + this.text = text; + } + } +}
