This is an automated email from the ASF dual-hosted git repository.
smolnar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/knox.git
The following commit(s) were added to refs/heads/master by this push:
new c63b865b5 KNOX-3198: Check for async support on SSE request (#1095)
c63b865b5 is described below
commit c63b865b550917dfa2c9dc23a23fc82c34867d72
Author: hanicz <[email protected]>
AuthorDate: Tue Oct 14 11:29:51 2025 +0200
KNOX-3198: Check for async support on SSE request (#1095)
---
.../gateway/ha/dispatch/SSEHaDispatchTest.java | 2 +
.../apache/knox/gateway/SpiGatewayMessages.java | 3 ++
.../org/apache/knox/gateway/sse/SSEDispatch.java | 28 ++++++++++---
.../apache/knox/gateway/sse/SSEDispatchTest.java | 48 +++++++++++++++-------
4 files changed, 60 insertions(+), 21 deletions(-)
diff --git
a/gateway-provider-ha/src/test/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatchTest.java
b/gateway-provider-ha/src/test/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatchTest.java
index d5cc9dadd..b8b224735 100644
---
a/gateway-provider-ha/src/test/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatchTest.java
+++
b/gateway-provider-ha/src/test/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatchTest.java
@@ -109,6 +109,7 @@ public class SSEHaDispatchTest {
expect(gatewayConfig.getHttpClientConnectionTimeout()).andReturn(20000).once();
expect(gatewayConfig.getHttpClientSocketTimeout()).andReturn(20000).once();
expect(gatewayConfig.getHttpClientCookieSpec()).andReturn(CookieSpecs.STANDARD).anyTimes();
+ expect(gatewayConfig.isAsyncSupported()).andReturn(true).anyTimes();
GatewayServices gatewayServices = createMock(GatewayServices.class);
expect(gatewayServices.getService(ServiceType.KEYSTORE_SERVICE)).andReturn(keystoreService).once();
@@ -686,6 +687,7 @@ public class SSEHaDispatchTest {
expect(gatewayConfig.getHttpClientConnectionTimeout()).andReturn(20000).once();
expect(gatewayConfig.getHttpClientSocketTimeout()).andReturn(20000).once();
expect(gatewayConfig.getHttpClientCookieSpec()).andReturn(CookieSpecs.STANDARD).anyTimes();
+ expect(gatewayConfig.isAsyncSupported()).andReturn(true).anyTimes();
GatewayServices gatewayServices = createMock(GatewayServices.class);
expect(gatewayServices.getService(ServiceType.KEYSTORE_SERVICE)).andReturn(keystoreService).once();
diff --git
a/gateway-spi/src/main/java/org/apache/knox/gateway/SpiGatewayMessages.java
b/gateway-spi/src/main/java/org/apache/knox/gateway/SpiGatewayMessages.java
index 9992602b6..5404e3ebe 100644
--- a/gateway-spi/src/main/java/org/apache/knox/gateway/SpiGatewayMessages.java
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/SpiGatewayMessages.java
@@ -147,4 +147,7 @@ public interface SpiGatewayMessages {
@Message( level = MessageLevel.ERROR, text = "Failed to send error to
client" )
void failedToSendErrorToClient(@StackTrace(level=MessageLevel.ERROR)
Exception e);
+
+ @Message( level = MessageLevel.ERROR, text = "Async support is not enabled.
SSEDispatch request failed." )
+ void asyncSupportNotEnabled();
}
diff --git
a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java
b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java
index 532503df5..2683d91b8 100644
--- a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java
@@ -35,6 +35,7 @@ import org.apache.http.protocol.HttpContext;
import org.apache.knox.gateway.audit.api.Action;
import org.apache.knox.gateway.audit.api.ActionOutcome;
import org.apache.knox.gateway.audit.api.ResourceType;
+import org.apache.knox.gateway.config.GatewayConfig;
import org.apache.knox.gateway.dispatch.AsyncDispatch;
import org.apache.knox.gateway.dispatch.ConfigurableDispatch;
import org.apache.knox.gateway.dispatch.DefaultHttpAsyncClientFactory;
@@ -52,15 +53,20 @@ import java.nio.charset.StandardCharsets;
public class SSEDispatch extends ConfigurableDispatch implements AsyncDispatch
{
- protected final HttpAsyncClient asyncClient;
+ protected HttpAsyncClient asyncClient;
private static final String TEXT_EVENT_STREAM_VALUE = "text/event-stream";
+ private final boolean asyncSupported;
public SSEDispatch(FilterConfig filterConfig) {
- HttpAsyncClientFactory asyncClientFactory = new
DefaultHttpAsyncClientFactory();
- this.asyncClient =
asyncClientFactory.createAsyncHttpClient(filterConfig);
-
- if (asyncClient instanceof CloseableHttpAsyncClient) {
- ((CloseableHttpAsyncClient) this.asyncClient).start();
+ GatewayConfig gatewayConfig = (GatewayConfig)
filterConfig.getServletContext().getAttribute(GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE);
+ this.asyncSupported = gatewayConfig.isAsyncSupported();
+ if(asyncSupported) {
+ HttpAsyncClientFactory asyncClientFactory = new
DefaultHttpAsyncClientFactory();
+ this.asyncClient =
asyncClientFactory.createAsyncHttpClient(filterConfig);
+
+ if (asyncClient instanceof CloseableHttpAsyncClient) {
+ ((CloseableHttpAsyncClient) this.asyncClient).start();
+ }
}
}
@@ -96,6 +102,10 @@ public class SSEDispatch extends ConfigurableDispatch
implements AsyncDispatch {
}
private void doHttpMethod(HttpUriRequest httpMethod, HttpServletRequest
inboundRequest, HttpServletResponse outboundResponse) throws IOException {
+ if(!asyncSupported) {
+ this.handleAsyncNotSupportedResponse(outboundResponse);
+ return;
+ }
this.addAcceptHeader(httpMethod);
this.copyRequestHeaderFields(httpMethod, inboundRequest);
this.executeRequestWrapper(httpMethod, inboundRequest,
outboundResponse);
@@ -135,6 +145,12 @@ public class SSEDispatch extends ConfigurableDispatch
implements AsyncDispatch {
auditor.audit(Action.DISPATCH, url.toString(), ResourceType.URI,
ActionOutcome.FAILURE, RES.responseStatus(statusCode));
}
+ private void handleAsyncNotSupportedResponse(HttpServletResponse
outboundResponse) throws IOException {
+ LOG.asyncSupportNotEnabled();
+
outboundResponse.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
+ "Async support is not enabled. SSE requests cannot be
processed.");
+ }
+
private void prepareServletResponse(HttpServletResponse outboundResponse,
int statusCode) {
LOG.dispatchResponseStatusCode(statusCode);
outboundResponse.setStatus(statusCode);
diff --git
a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java
b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java
index e6fac3cb4..4981b8b2b 100644
--- a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java
+++ b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java
@@ -67,7 +67,7 @@ public class SSEDispatchTest {
@Test
public void testCreateAndDestroyClient() throws Exception {
- SSEDispatch sseDispatch = this.createDispatch();
+ SSEDispatch sseDispatch = this.createDispatch(true);
assertNotNull(sseDispatch.getAsyncClient());
sseDispatch.destroy();
@@ -77,7 +77,7 @@ public class SSEDispatchTest {
@Test
public void testGet2xx() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
- SSEDispatch sseDispatch = this.createDispatch();
+ SSEDispatch sseDispatch = this.createDispatch(true);
PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_OK);
AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
@@ -107,7 +107,7 @@ public class SSEDispatchTest {
@Test
public void testGet4xx() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
- SSEDispatch sseDispatch = this.createDispatch();
+ SSEDispatch sseDispatch = this.createDispatch(true);
HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_BAD_REQUEST);
AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
@@ -130,7 +130,7 @@ public class SSEDispatchTest {
@Test
public void testGet5xx() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
- SSEDispatch sseDispatch = this.createDispatch();
+ SSEDispatch sseDispatch = this.createDispatch(true);
HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
@@ -153,7 +153,7 @@ public class SSEDispatchTest {
@Test
public void testPost2xx() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
- SSEDispatch sseDispatch = this.createDispatch();
+ SSEDispatch sseDispatch = this.createDispatch(true);
PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_OK);
AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
@@ -184,7 +184,7 @@ public class SSEDispatchTest {
@Test
public void testPost4xx() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
- SSEDispatch sseDispatch = this.createDispatch();
+ SSEDispatch sseDispatch = this.createDispatch(true);
HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_NOT_FOUND);
AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
@@ -207,7 +207,7 @@ public class SSEDispatchTest {
@Test
public void testPost5xx() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
- SSEDispatch sseDispatch = this.createDispatch();
+ SSEDispatch sseDispatch = this.createDispatch(true);
HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
@@ -230,7 +230,7 @@ public class SSEDispatchTest {
@Test
public void testPut2xx() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
- SSEDispatch sseDispatch = this.createDispatch();
+ SSEDispatch sseDispatch = this.createDispatch(true);
PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_OK);
AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
@@ -261,7 +261,7 @@ public class SSEDispatchTest {
@Test
public void testPut4xx() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
- SSEDispatch sseDispatch = this.createDispatch();
+ SSEDispatch sseDispatch = this.createDispatch(true);
HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_NOT_FOUND);
AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
@@ -284,7 +284,7 @@ public class SSEDispatchTest {
@Test
public void testPut5xx() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
- SSEDispatch sseDispatch = this.createDispatch();
+ SSEDispatch sseDispatch = this.createDispatch(true);
HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
@@ -307,7 +307,7 @@ public class SSEDispatchTest {
@Test
public void testPatch2xx() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
- SSEDispatch sseDispatch = this.createDispatch();
+ SSEDispatch sseDispatch = this.createDispatch(true);
PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_OK);
AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
@@ -338,7 +338,7 @@ public class SSEDispatchTest {
@Test
public void testPatch4xx() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
- SSEDispatch sseDispatch = this.createDispatch();
+ SSEDispatch sseDispatch = this.createDispatch(true);
HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_NOT_FOUND);
AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
@@ -361,7 +361,7 @@ public class SSEDispatchTest {
@Test
public void testPatch5xx() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
- SSEDispatch sseDispatch = this.createDispatch();
+ SSEDispatch sseDispatch = this.createDispatch(true);
HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
@@ -384,7 +384,7 @@ public class SSEDispatchTest {
@Test
public void testServerNotAvailable() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
- SSEDispatch sseDispatch = this.createDispatch();
+ SSEDispatch sseDispatch = this.createDispatch(true);
HttpServletResponse outboundResponse =
EasyMock.createNiceMock(HttpServletResponse.class);
outboundResponse.sendError(HttpServletResponse.SC_BAD_GATEWAY,
"Service connection error");
EasyMock.expectLastCall().once();
@@ -399,6 +399,23 @@ public class SSEDispatchTest {
EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
}
+ @Test
+ public void testAsyncNotSupported() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ SSEDispatch sseDispatch = this.createDispatch(false);
+ HttpServletResponse outboundResponse =
EasyMock.createNiceMock(HttpServletResponse.class);
+
outboundResponse.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
+ "Async support is not enabled. SSE requests cannot be
processed.");
+ EasyMock.expectLastCall().once();
+
+ replay(outboundResponse);
+
+ sseDispatch.doGet(new URI("http://localhost:11223/sse"), null,
outboundResponse);
+
+ latch.await(1L, TimeUnit.SECONDS);
+ EasyMock.verify(outboundResponse);
+ }
+
private HttpServletRequest getHttpServletRequest(AsyncContext
asyncContext) throws Exception {
Map<String, String> headers = new HashMap<>();
headers.put("request", "header");
@@ -451,7 +468,7 @@ public class SSEDispatchTest {
EasyMock.expectLastCall().times(2);
}
- private SSEDispatch createDispatch() throws Exception {
+ private SSEDispatch createDispatch(boolean asyncSupported) throws
Exception {
KeystoreService keystoreService = createMock(KeystoreService.class);
expect(keystoreService.getTruststoreForHttpClient()).andReturn(null).once();
@@ -461,6 +478,7 @@ public class SSEDispatchTest {
expect(gatewayConfig.getHttpClientConnectionTimeout()).andReturn(20000).once();
expect(gatewayConfig.getHttpClientSocketTimeout()).andReturn(20000).once();
expect(gatewayConfig.getHttpClientCookieSpec()).andReturn(CookieSpecs.STANDARD).anyTimes();
+
expect(gatewayConfig.isAsyncSupported()).andReturn(asyncSupported).once();
GatewayServices gatewayServices = createMock(GatewayServices.class);
expect(gatewayServices.getService(ServiceType.KEYSTORE_SERVICE)).andReturn(keystoreService).once();