This is an automated email from the ASF dual-hosted git repository.

jgallimore pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tomee.git

commit f4ee15f62de7b202d1c0ff43883219a85ce95c47
Author: Jonathan Gallimore <[email protected]>
AuthorDate: Thu Nov 26 15:17:43 2020 +0000

    Wrap SseEventSinkImpl to prevent any kind of use after close
---
 .../apache/openejb/server/cxf/rs/CxfRSService.java |  4 +-
 .../openejb/server/cxf/rs/CxfRsHttpListener.java   |  6 +++
 .../rs/sse/TomEESseEventSinkContextProvider.java   | 61 ++++++++++++++++++++++
 3 files changed, 68 insertions(+), 3 deletions(-)

diff --git 
a/server/openejb-cxf-rs/src/main/java/org/apache/openejb/server/cxf/rs/CxfRSService.java
 
b/server/openejb-cxf-rs/src/main/java/org/apache/openejb/server/cxf/rs/CxfRSService.java
index 70b8c64..c27fd8b 100644
--- 
a/server/openejb-cxf-rs/src/main/java/org/apache/openejb/server/cxf/rs/CxfRSService.java
+++ 
b/server/openejb-cxf-rs/src/main/java/org/apache/openejb/server/cxf/rs/CxfRSService.java
@@ -229,9 +229,7 @@ public class CxfRSService extends RESTService {
             if (userConfiguredJsonProviders == null) {
                 jsonProviders = asList(
                         
"org.apache.openejb.server.cxf.rs.johnzon.TomEEJsonbProvider",
-                        
"org.apache.openejb.server.cxf.rs.johnzon.TomEEJsonpProvider",
-                        "org.apache.cxf.jaxrs.sse.SseContextProvider",
-                        
"org.apache.cxf.jaxrs.sse.SseEventSinkContextProvider");
+                        
"org.apache.openejb.server.cxf.rs.johnzon.TomEEJsonpProvider");
             } else {
                 jsonProviders = asList(userConfiguredJsonProviders.split(","));
             }
diff --git 
a/server/openejb-cxf-rs/src/main/java/org/apache/openejb/server/cxf/rs/CxfRsHttpListener.java
 
b/server/openejb-cxf-rs/src/main/java/org/apache/openejb/server/cxf/rs/CxfRsHttpListener.java
index c4f97d9..d857d51 100644
--- 
a/server/openejb-cxf-rs/src/main/java/org/apache/openejb/server/cxf/rs/CxfRsHttpListener.java
+++ 
b/server/openejb-cxf-rs/src/main/java/org/apache/openejb/server/cxf/rs/CxfRsHttpListener.java
@@ -38,6 +38,8 @@ import org.apache.cxf.jaxrs.model.OperationResourceInfo;
 import org.apache.cxf.jaxrs.model.ProviderInfo;
 import org.apache.cxf.jaxrs.provider.ProviderFactory;
 import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
+import org.apache.cxf.jaxrs.sse.SseContextProvider;
+import org.apache.cxf.jaxrs.sse.SseEventSinkContextProvider;
 import org.apache.cxf.jaxrs.utils.JAXRSUtils;
 import org.apache.cxf.jaxrs.utils.HttpUtils;
 import org.apache.cxf.jaxrs.validation.JAXRSBeanValidationInInterceptor;
@@ -74,6 +76,7 @@ import org.apache.openejb.rest.ThreadLocalContextManager;
 import org.apache.openejb.server.cxf.rs.event.ExtensionProviderRegistration;
 import org.apache.openejb.server.cxf.rs.event.ServerCreated;
 import org.apache.openejb.server.cxf.rs.event.ServerDestroyed;
+import org.apache.openejb.server.cxf.rs.sse.TomEESseEventSinkContextProvider;
 import org.apache.openejb.server.cxf.transport.HttpDestination;
 import org.apache.openejb.server.cxf.transport.util.CxfUtil;
 import org.apache.openejb.server.httpd.HttpRequest;
@@ -741,6 +744,9 @@ public class CxfRsHttpListener implements RsHttpListener {
             }
 
             try {
+                factory.setProvider(new SseContextProvider());
+                factory.setProvider(new TomEESseEventSinkContextProvider());
+
                 server = factory.create();
                 fixProviderIfKnown();
                 fireServerCreated(oldLoader);
diff --git 
a/server/openejb-cxf-rs/src/main/java/org/apache/openejb/server/cxf/rs/sse/TomEESseEventSinkContextProvider.java
 
b/server/openejb-cxf-rs/src/main/java/org/apache/openejb/server/cxf/rs/sse/TomEESseEventSinkContextProvider.java
new file mode 100644
index 0000000..912a830
--- /dev/null
+++ 
b/server/openejb-cxf-rs/src/main/java/org/apache/openejb/server/cxf/rs/sse/TomEESseEventSinkContextProvider.java
@@ -0,0 +1,61 @@
+/*
+ *     Licensed to the Apache Software Foundation (ASF) under one or more
+ *     contributor license agreements.  See the NOTICE file distributed with
+ *     this work for additional information regarding copyright ownership.
+ *     The ASF licenses this file to You under the Apache License, Version 2.0
+ *     (the "License"); you may not use this file except in compliance with
+ *     the License.  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *     Unless required by applicable law or agreed to in writing, software
+ *     distributed under the License is distributed on an "AS IS" BASIS,
+ *     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *     See the License for the specific language governing permissions and
+ *     limitations under the License.
+ */
+package org.apache.openejb.server.cxf.rs.sse;
+
+import org.apache.cxf.jaxrs.ext.ContextProvider;
+import org.apache.cxf.jaxrs.sse.SseEventSinkContextProvider;
+import org.apache.cxf.jaxrs.sse.SseEventSinkImpl;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.sse.OutboundSseEvent;
+import javax.ws.rs.sse.SseEventSink;
+import java.util.concurrent.CompletionStage;
+
+public class TomEESseEventSinkContextProvider extends 
SseEventSinkContextProvider implements ContextProvider<SseEventSink> {
+    @Override
+    protected SseEventSink createSseEventSink(final HttpServletRequest request,
+                                              final 
MessageBodyWriter<OutboundSseEvent> writer,
+                                              final AsyncResponse async, final 
Integer bufferSize) {
+        if (bufferSize != null) {
+            return new TomEESseEventSink(writer, async, 
request.getAsyncContext(), bufferSize);
+        } else {
+            return new TomEESseEventSink(writer, async, 
request.getAsyncContext());
+        }
+    }
+
+    public static class TomEESseEventSink extends SseEventSinkImpl implements 
SseEventSink {
+        public TomEESseEventSink(final MessageBodyWriter<OutboundSseEvent> 
writer, final AsyncResponse async, final AsyncContext ctx) {
+            super(writer, async, ctx);
+        }
+
+        public TomEESseEventSink(final MessageBodyWriter<OutboundSseEvent> 
writer, final AsyncResponse async, final AsyncContext ctx, final int 
bufferSize) {
+            super(writer, async, ctx, bufferSize);
+        }
+
+        @Override
+        public CompletionStage<?> send(OutboundSseEvent event) {
+            if (isClosed()) {
+                throw new IllegalStateException("The event sink is closed");
+            }
+
+            return super.send(event);
+        }
+    }
+}

Reply via email to