This is an automated email from the ASF dual-hosted git repository.
pzampino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/knox.git
The following commit(s) were added to refs/heads/master by this push:
new 58dd02b02 KNOX-3065: SSE support for Knox (#947)
58dd02b02 is described below
commit 58dd02b02c62b4bb5f54edd9d91ff29d4d616fb2
Author: hanicz <[email protected]>
AuthorDate: Mon Nov 11 14:46:31 2024 +0100
KNOX-3065: SSE support for Knox (#947)
---
gateway-spi/pom.xml | 18 +
.../apache/knox/gateway/SpiGatewayMessages.java | 15 +
.../dispatch/DefaultHttpAsyncClientFactory.java | 87 ++++
.../gateway/dispatch/DefaultHttpClientFactory.java | 18 +-
.../gateway/dispatch/GatewayDispatchFilter.java | 20 +-
.../gateway/dispatch/HttpAsyncClientFactory.java | 26 ++
.../org/apache/knox/gateway/sse/SSEDispatch.java | 234 ++++++++++
.../org/apache/knox/gateway/sse/SSEEntity.java | 150 +++++++
.../org/apache/knox/gateway/sse/SSEException.java | 25 ++
.../org/apache/knox/gateway/sse/SSEResponse.java | 158 +++++++
.../java/org/apache/knox/gateway/sse/SSEvent.java | 101 +++++
.../DefaultHttpAsyncClientFactoryTest.java | 76 ++++
.../apache/knox/gateway/sse/SSEDispatchTest.java | 483 +++++++++++++++++++++
.../org/apache/knox/gateway/sse/SSEEntityTest.java | 163 +++++++
.../org/apache/knox/gateway/sse/SSEventTest.java | 83 ++++
pom.xml | 11 +
16 files changed, 1656 insertions(+), 12 deletions(-)
diff --git a/gateway-spi/pom.xml b/gateway-spi/pom.xml
index b6151b0bf..d09c39bf4 100644
--- a/gateway-spi/pom.xml
+++ b/gateway-spi/pom.xml
@@ -55,6 +55,24 @@
<artifactId>gateway-util-urltemplate</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpasyncclient</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore-nio</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore-nio</artifactId>
+ </dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
diff --git
a/gateway-spi/src/main/java/org/apache/knox/gateway/SpiGatewayMessages.java
b/gateway-spi/src/main/java/org/apache/knox/gateway/SpiGatewayMessages.java
index 771e616f4..4440069d8 100644
--- a/gateway-spi/src/main/java/org/apache/knox/gateway/SpiGatewayMessages.java
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/SpiGatewayMessages.java
@@ -123,4 +123,19 @@ public interface SpiGatewayMessages {
@Message( level = MessageLevel.ERROR, text = "No valid principal found" )
void noPrincipalFound();
+
+ @Message( level = MessageLevel.INFO, text = "Every event was read from the
SSE stream" )
+ void sseConnectionDone();
+
+ @Message( level = MessageLevel.ERROR, text = "Error during SSE connection:
{0}" )
+ void sseConnectionError(String error);
+
+ @Message( level = MessageLevel.ERROR, text = "Error writing into the SSE
output stream : {0}" )
+ void errorWritingOutputStream(@StackTrace(level=MessageLevel.ERROR)
Exception e);
+
+ @Message( level = MessageLevel.WARN, text = "SSE connection cancelled" )
+ void sseConnectionCancelled();
+
+ @Message( level = MessageLevel.ERROR, text = "Unable to close SSE producer" )
+ void sseProducerCloseError(@StackTrace(level=MessageLevel.ERROR) Exception
e);
}
diff --git
a/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/DefaultHttpAsyncClientFactory.java
b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/DefaultHttpAsyncClientFactory.java
new file mode 100644
index 000000000..24840233f
--- /dev/null
+++
b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/DefaultHttpAsyncClientFactory.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.dispatch;
+
+import org.apache.http.auth.AuthSchemeProvider;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.config.AuthSchemes;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+import org.apache.http.nio.client.HttpAsyncClient;
+import org.apache.knox.gateway.config.GatewayConfig;
+import org.apache.knox.gateway.services.GatewayServices;
+import org.apache.knox.gateway.services.ServiceType;
+import org.apache.knox.gateway.services.metrics.MetricsService;
+
+import javax.net.ssl.SSLContext;
+import javax.servlet.FilterConfig;
+
+public class DefaultHttpAsyncClientFactory extends DefaultHttpClientFactory
implements HttpAsyncClientFactory {
+
+ @Override
+ public HttpAsyncClient createAsyncHttpClient(FilterConfig filterConfig) {
+ final String serviceRole =
filterConfig.getInitParameter(PARAMETER_SERVICE_ROLE);
+ HttpAsyncClientBuilder builder;
+ GatewayConfig gatewayConfig = (GatewayConfig)
filterConfig.getServletContext().getAttribute(GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE);
+ GatewayServices services = (GatewayServices)
filterConfig.getServletContext()
+ .getAttribute(GatewayServices.GATEWAY_SERVICES_ATTRIBUTE);
+ if (gatewayConfig != null && gatewayConfig.isMetricsEnabled()) {
+ MetricsService metricsService =
services.getService(ServiceType.METRICS_SERVICE);
+ builder =
metricsService.getInstrumented(HttpAsyncClientBuilder.class);
+ } else {
+ builder = HttpAsyncClients.custom();
+ }
+
+ // Conditionally set a custom SSLContext
+ SSLContext sslContext = createSSLContext(services, filterConfig,
serviceRole);
+ if (sslContext != null) {
+ builder.setSSLContext(sslContext);
+ }
+
+ if
(Boolean.parseBoolean(System.getProperty(GatewayConfig.HADOOP_KERBEROS_SECURED)))
{
+ CredentialsProvider credentialsProvider = new
BasicCredentialsProvider();
+ credentialsProvider.setCredentials(AuthScope.ANY, new
DefaultHttpAsyncClientFactory.UseJaasCredentials());
+
+ Registry<AuthSchemeProvider> authSchemeRegistry =
RegistryBuilder.<AuthSchemeProvider>create()
+ .register(AuthSchemes.SPNEGO, new
KnoxSpnegoAuthSchemeFactory(true))
+ .build();
+
+ builder.setDefaultAuthSchemeRegistry(authSchemeRegistry)
+ .setDefaultCookieStore(new
HadoopAuthCookieStore(gatewayConfig))
+ .setDefaultCredentialsProvider(credentialsProvider);
+ } else {
+ builder.setDefaultCookieStore(new
DefaultHttpAsyncClientFactory.NoCookieStore());
+ }
+
+
builder.setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE);
+
builder.setConnectionReuseStrategy(DefaultConnectionReuseStrategy.INSTANCE);
+ builder.setRedirectStrategy(new
DefaultHttpAsyncClientFactory.NeverRedirectStrategy());
+ int maxConnections = getMaxConnections(filterConfig);
+ builder.setMaxConnTotal(maxConnections);
+ builder.setMaxConnPerRoute(maxConnections);
+ builder.setDefaultRequestConfig(getRequestConfig(filterConfig,
serviceRole));
+
+ return builder.build();
+ }
+}
diff --git
a/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/DefaultHttpClientFactory.java
b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/DefaultHttpClientFactory.java
index 03a4f891a..cb658c8fe 100644
---
a/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/DefaultHttpClientFactory.java
+++
b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/DefaultHttpClientFactory.java
@@ -69,7 +69,7 @@ import org.joda.time.format.PeriodFormatterBuilder;
public class DefaultHttpClientFactory implements HttpClientFactory {
private static final SpiGatewayMessages LOG =
MessagesFactory.get(SpiGatewayMessages.class);
- private static final String PARAMETER_SERVICE_ROLE = "serviceRole";
+ protected static final String PARAMETER_SERVICE_ROLE = "serviceRole";
static final String PARAMETER_USE_TWO_WAY_SSL = "useTwoWaySsl";
/* retry in case of NoHttpResponseException */
static final String PARAMETER_RETRY_COUNT = "retryCount";
@@ -255,7 +255,7 @@ public class DefaultHttpClientFactory implements
HttpClientFactory {
return builder.build();
}
- private static class NoCookieStore implements CookieStore {
+ protected static class NoCookieStore implements CookieStore {
@Override
public void addCookie(Cookie cookie) {
//no op
@@ -277,7 +277,7 @@ public class DefaultHttpClientFactory implements
HttpClientFactory {
}
}
- private static class NeverRedirectStrategy implements RedirectStrategy {
+ protected static class NeverRedirectStrategy implements RedirectStrategy {
@Override
public boolean isRedirected( HttpRequest request, HttpResponse response,
HttpContext context )
throws ProtocolException {
@@ -298,7 +298,7 @@ public class DefaultHttpClientFactory implements
HttpClientFactory {
}
}
- private static class UseJaasCredentials implements Credentials {
+ protected static class UseJaasCredentials implements Credentials {
@Override
public String getPassword() {
@@ -312,7 +312,7 @@ public class DefaultHttpClientFactory implements
HttpClientFactory {
}
- private int getMaxConnections( FilterConfig filterConfig ) {
+ protected int getMaxConnections( FilterConfig filterConfig ) {
int maxConnections = 32;
GatewayConfig config =
(GatewayConfig)filterConfig.getServletContext().getAttribute(
GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE );
@@ -330,7 +330,7 @@ public class DefaultHttpClientFactory implements
HttpClientFactory {
return maxConnections;
}
- private static int getConnectionTimeout( FilterConfig filterConfig ) {
+ protected static int getConnectionTimeout( FilterConfig filterConfig ) {
int timeout = -1;
GatewayConfig globalConfig =
(GatewayConfig)filterConfig.getServletContext().getAttribute(
GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE );
@@ -348,7 +348,7 @@ public class DefaultHttpClientFactory implements
HttpClientFactory {
return timeout;
}
- private static int getSocketTimeout( FilterConfig filterConfig ) {
+ protected static int getSocketTimeout( FilterConfig filterConfig ) {
int timeout = -1;
GatewayConfig globalConfig =
(GatewayConfig)filterConfig.getServletContext().getAttribute(
GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE );
@@ -366,7 +366,7 @@ public class DefaultHttpClientFactory implements
HttpClientFactory {
return timeout;
}
- private static long parseTimeout( String s ) {
+ protected static long parseTimeout( String s ) {
PeriodFormatter f = new PeriodFormatterBuilder()
.appendMinutes().appendSuffix("m"," min")
.appendSeconds().appendSuffix("s"," sec")
@@ -375,7 +375,7 @@ public class DefaultHttpClientFactory implements
HttpClientFactory {
return p.toStandardDuration().getMillis();
}
- private static String getCookieSpec(FilterConfig filterConfig) {
+ protected static String getCookieSpec(FilterConfig filterConfig) {
String cookieSpec = filterConfig.getInitParameter("httpclient.cookieSpec");
if (StringUtils.isNotBlank(cookieSpec)) {
return cookieSpec;
diff --git
a/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/GatewayDispatchFilter.java
b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/GatewayDispatchFilter.java
index 870d3f788..48196d137 100644
---
a/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/GatewayDispatchFilter.java
+++
b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/GatewayDispatchFilter.java
@@ -32,6 +32,7 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
+import java.lang.reflect.Constructor;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.util.Collections;
@@ -71,13 +72,13 @@ public class GatewayDispatchFilter extends
AbstractGatewayFilter {
synchronized(lock) {
if (dispatch == null) {
String dispatchImpl = filterConfig.getInitParameter("dispatch-impl");
- dispatch = newInstanceFromName(dispatchImpl);
+ dispatch = newInstanceFromName(dispatchImpl, filterConfig);
}
ConfigurationInjectorBuilder.configuration().target(dispatch).source(filterConfig).inject();
HttpClientFactory httpClientFactory;
String httpClientFactoryClass =
filterConfig.getInitParameter("httpClientFactory");
if (httpClientFactoryClass != null) {
- httpClientFactory = newInstanceFromName(httpClientFactoryClass);
+ httpClientFactory = newInstanceFromName(httpClientFactoryClass,
filterConfig);
} else {
httpClientFactory = new DefaultHttpClientFactory();
}
@@ -217,15 +218,28 @@ public class GatewayDispatchFilter extends
AbstractGatewayFilter {
}
}
- private <T> T newInstanceFromName(String dispatchImpl) throws
ServletException {
+ private <T> T newInstanceFromName(String dispatchImpl, FilterConfig
filterConfig) throws ServletException {
try {
Class<T> clazz = loadClass(dispatchImpl);
+ Constructor<?> constructor = this.getConstructorWithType(clazz,
FilterConfig.class);
+ if(constructor != null) {
+ return (T) constructor.newInstance(filterConfig);
+ }
return clazz.newInstance();
} catch ( Exception e ) {
throw new ServletException(e);
}
}
+ private <T> Constructor<?> getConstructorWithType(Class<T> clazz, Class<?>
type) {
+ for (Constructor<?> constructor : clazz.getConstructors()) {
+ if (constructor.getParameterCount() == 1 &&
constructor.getParameterTypes()[0] == type) {
+ return constructor;
+ }
+ }
+ return null;
+ }
+
private <T> Class<T> loadClass(String className) throws
ClassNotFoundException {
ClassLoader loader = Thread.currentThread().getContextClassLoader();
if ( loader == null ) {
diff --git
a/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/HttpAsyncClientFactory.java
b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/HttpAsyncClientFactory.java
new file mode 100644
index 000000000..c5180ec01
--- /dev/null
+++
b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/HttpAsyncClientFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.dispatch;
+
+import org.apache.http.nio.client.HttpAsyncClient;
+
+import javax.servlet.FilterConfig;
+
+public interface HttpAsyncClientFactory extends HttpClientFactory {
+ HttpAsyncClient createAsyncHttpClient(FilterConfig filterConfig);
+}
diff --git
a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java
b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java
new file mode 100644
index 000000000..318c4bb82
--- /dev/null
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.nio.IOControl;
+import org.apache.http.nio.client.HttpAsyncClient;
+import org.apache.http.nio.client.methods.AsyncCharConsumer;
+import org.apache.http.nio.client.methods.HttpAsyncMethods;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.http.protocol.HttpContext;
+import org.apache.knox.gateway.audit.api.Action;
+import org.apache.knox.gateway.audit.api.ActionOutcome;
+import org.apache.knox.gateway.audit.api.ResourceType;
+import org.apache.knox.gateway.dispatch.ConfigurableDispatch;
+import org.apache.knox.gateway.dispatch.DefaultHttpAsyncClientFactory;
+import org.apache.knox.gateway.dispatch.HttpAsyncClientFactory;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.FilterConfig;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.CharBuffer;
+import java.nio.charset.StandardCharsets;
+
+public class SSEDispatch extends ConfigurableDispatch {
+
+ private final HttpAsyncClient asyncClient;
+ private static final String TEXT_EVENT_STREAM_VALUE = "text/event-stream";
+
+ public SSEDispatch(FilterConfig filterConfig) {
+ HttpAsyncClientFactory asyncClientFactory = new
DefaultHttpAsyncClientFactory();
+ this.asyncClient =
asyncClientFactory.createAsyncHttpClient(filterConfig);
+
+ if (asyncClient instanceof CloseableHttpAsyncClient) {
+ ((CloseableHttpAsyncClient) this.asyncClient).start();
+ }
+ }
+
+ @Override
+ public void doGet(URI url, HttpServletRequest inboundRequest,
HttpServletResponse outboundResponse)
+ throws IOException {
+ final HttpGet httpGetRequest = new HttpGet(url);
+ this.doHttpMethod(httpGetRequest, inboundRequest, outboundResponse);
+ }
+
+ @Override
+ public void doPost(URI url, HttpServletRequest inboundRequest,
HttpServletResponse outboundResponse)
+ throws IOException, URISyntaxException {
+ final HttpPost httpPostRequest = new HttpPost(url);
+ httpPostRequest.setEntity(this.createRequestEntity(inboundRequest));
+ this.doHttpMethod(httpPostRequest, inboundRequest, outboundResponse);
+ }
+
+ @Override
+ public void doPut(URI url, HttpServletRequest inboundRequest,
HttpServletResponse outboundResponse)
+ throws IOException {
+ final HttpPut httpPutRequest = new HttpPut(url);
+ httpPutRequest.setEntity(this.createRequestEntity(inboundRequest));
+ this.doHttpMethod(httpPutRequest, inboundRequest, outboundResponse);
+ }
+
+ @Override
+ public void doPatch(URI url, HttpServletRequest inboundRequest,
HttpServletResponse outboundResponse)
+ throws IOException {
+ final HttpPatch httpPatchRequest = new HttpPatch(url);
+ httpPatchRequest.setEntity(this.createRequestEntity(inboundRequest));
+ this.doHttpMethod(httpPatchRequest, inboundRequest, outboundResponse);
+ }
+
+ private void doHttpMethod(HttpUriRequest httpMethod, HttpServletRequest
inboundRequest, HttpServletResponse outboundResponse) throws IOException {
+ this.addAcceptHeader(httpMethod);
+ this.copyRequestHeaderFields(httpMethod, inboundRequest);
+ this.executeRequest(httpMethod, outboundResponse, inboundRequest);
+ }
+
+ private void executeRequest(HttpUriRequest outboundRequest,
HttpServletResponse outboundResponse, HttpServletRequest inboundRequest) {
+ AsyncContext asyncContext = inboundRequest.startAsync();
+ //No timeout
+ asyncContext.setTimeout(0L);
+
+ HttpAsyncRequestProducer producer =
HttpAsyncMethods.create(outboundRequest);
+ AsyncCharConsumer<SSEResponse> consumer = new
SSECharConsumer(outboundResponse, outboundRequest.getURI(), asyncContext);
+ this.executeAsyncRequest(producer, consumer, outboundRequest);
+ }
+
+ private void executeAsyncRequest(HttpAsyncRequestProducer producer,
AsyncCharConsumer<SSEResponse> consumer, HttpUriRequest outboundRequest) {
+ LOG.dispatchRequest(outboundRequest.getMethod(),
outboundRequest.getURI());
+ auditor.audit(Action.DISPATCH, outboundRequest.getURI().toString(),
ResourceType.URI, ActionOutcome.UNAVAILABLE,
RES.requestMethod(outboundRequest.getMethod()));
+ asyncClient.execute(producer, consumer, new
FutureCallback<SSEResponse>() {
+
+ @Override
+ public void completed(final SSEResponse response) {
+ closeProducer(producer);
+ LOG.sseConnectionDone();
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ closeProducer(producer);
+ LOG.sseConnectionError(ex.getMessage());
+ }
+
+ @Override
+ public void cancelled() {
+ closeProducer(producer);
+ LOG.sseConnectionCancelled();
+ }
+ });
+ }
+
+ private void addAcceptHeader(HttpUriRequest outboundRequest) {
+ outboundRequest.setHeader(HttpHeaders.ACCEPT,
SSEDispatch.TEXT_EVENT_STREAM_VALUE);
+ }
+
+ private void handleSuccessResponse(HttpServletResponse outboundResponse,
URI url, HttpResponse inboundResponse) {
+ this.prepareServletResponse(outboundResponse,
inboundResponse.getStatusLine().getStatusCode());
+ this.copyResponseHeaderFields(outboundResponse, inboundResponse);
+ auditor.audit(Action.DISPATCH, url.toString(), ResourceType.URI,
ActionOutcome.SUCCESS, RES.responseStatus(HttpStatus.SC_OK));
+ }
+
+ private void handleErrorResponse(HttpServletResponse outboundResponse, URI
url, HttpResponse httpResponse) {
+ int statusCode = httpResponse.getStatusLine().getStatusCode();
+ outboundResponse.setStatus(statusCode);
+ LOG.dispatchResponseStatusCode(statusCode);
+ auditor.audit(Action.DISPATCH, url.toString(), ResourceType.URI,
ActionOutcome.FAILURE, RES.responseStatus(statusCode));
+ }
+
+ private void prepareServletResponse(HttpServletResponse outboundResponse,
int statusCode) {
+ LOG.dispatchResponseStatusCode(statusCode);
+ outboundResponse.setStatus(statusCode);
+ outboundResponse.setCharacterEncoding(StandardCharsets.UTF_8.name());
+ }
+
+ private boolean isSuccessful(int statusCode) {
+ return (statusCode >= HttpStatus.SC_OK && statusCode < 300);
+ }
+
+ private void closeProducer(HttpAsyncRequestProducer producer) {
+ try {
+ producer.close();
+ } catch (IOException e) {
+ LOG.sseProducerCloseError(e);
+ }
+ }
+
+ private class SSECharConsumer extends AsyncCharConsumer<SSEResponse> {
+ private SSEResponse sseResponse;
+ private final HttpServletResponse outboundResponse;
+ private final URI url;
+ private final AsyncContext asyncContext;
+
+ SSECharConsumer(HttpServletResponse outboundResponse, URI url,
AsyncContext asyncContext) {
+ this.outboundResponse = outboundResponse;
+ this.url = url;
+ this.asyncContext = asyncContext;
+ }
+
+ @Override
+ protected void onResponseReceived(final HttpResponse inboundResponse) {
+ this.sseResponse = new SSEResponse(inboundResponse);
+ if (isSuccessful(inboundResponse.getStatusLine().getStatusCode()))
{
+ handleSuccessResponse(outboundResponse, url, inboundResponse);
+ } else {
+ handleErrorResponse(outboundResponse, url, inboundResponse);
+ }
+ }
+
+ @Override
+ protected void onCharReceived(final CharBuffer buf, final IOControl
ioctl) {
+ try {
+ if (this.sseResponse.getEntity().readCharBuffer(buf)) {
+ this.sseResponse.getEntity().sendEvent(this.asyncContext);
+ }
+ } catch (InterruptedException | IOException e) {
+ LOG.errorWritingOutputStream(e);
+ throw new SSEException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ protected void releaseResources() {
+ this.asyncContext.complete();
+ }
+
+ @Override
+ protected SSEResponse buildResult(final HttpContext context) {
+ return this.sseResponse;
+
+ }
+ }
+
+ @Override
+ public void destroy() {
+ try {
+ if (this.asyncClient != null && asyncClient instanceof
CloseableHttpAsyncClient) {
+ ((CloseableHttpAsyncClient) asyncClient).close();
+ }
+ } catch (IOException e) {
+ LOG.errorClosingHttpClient(e);
+ }
+ }
+
+ public HttpAsyncClient getAsyncClient() {
+ return this.asyncClient;
+ }
+}
diff --git
a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEEntity.java
b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEEntity.java
new file mode 100644
index 000000000..8b598c631
--- /dev/null
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEEntity.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.entity.AbstractHttpEntity;
+
+import javax.servlet.AsyncContext;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.CharBuffer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class SSEEntity extends AbstractHttpEntity {
+
+ private static final String SSE_DELIMITER = ":";
+
+ private final BlockingQueue<SSEvent> eventQueue;
+ private final StringBuilder eventBuilder = new StringBuilder();
+ private final HttpEntity httpEntity;
+ private char previousChar = '0';
+
+ public SSEEntity(HttpEntity httpEntity) {
+ this.httpEntity = httpEntity;
+ this.eventQueue = new LinkedBlockingQueue<>();
+ }
+
+ public boolean readCharBuffer(CharBuffer charBuffer) {
+ while (charBuffer.hasRemaining()) {
+ processChar(charBuffer.get());
+ }
+ return !eventQueue.isEmpty();
+ }
+
+ //Two new line chars (\n\n) after each other means the event is finished
streaming
+ //We can process it and add it to the event queue
+ private void processChar(char nextChar) {
+ if (isNewLineChar(nextChar) && isNewLineChar(previousChar)) {
+ processEvent();
+ eventBuilder.setLength(0);
+ previousChar = '0';
+ } else {
+ eventBuilder.append(nextChar);
+ previousChar = nextChar;
+ }
+ }
+
+ private boolean isNewLineChar(char c) {
+ return c == '\n' || c == '\r' || c == '\u0085' || c == '\u2028' || c
== '\u2029';
+ }
+
+ private void processEvent() {
+ String unprocessedEvent = eventBuilder.toString();
+ SSEvent ssEvent = new SSEvent();
+
+ for (String line : unprocessedEvent.split("\\R")) {
+ String[] lineTokens = this.parseLine(line);
+ switch (lineTokens[0]) {
+ case "id":
+ ssEvent.setId(lineTokens[1].trim());
+ break;
+ case "event":
+ ssEvent.setEvent(lineTokens[1].trim());
+ break;
+ case "data":
+ ssEvent.setData(lineTokens[1].trim());
+ break;
+ case "comment":
+ ssEvent.setComment(lineTokens[1].trim());
+ break;
+ case "retry":
+ ssEvent.setRetry(Long.parseLong(lineTokens[1].trim()));
+ break;
+ default:
+ break;
+ }
+ }
+ eventQueue.add(ssEvent);
+ }
+
+ private String[] parseLine(String line) {
+ String[] lineTokens = new String[2];
+ if(line.startsWith(SSE_DELIMITER)) {
+ lineTokens[0] = "comment";
+ lineTokens[1] = line;
+ } else {
+ lineTokens = line.split(SSE_DELIMITER, 2);
+ }
+
+ return lineTokens;
+ }
+
+ public void sendEvent(AsyncContext asyncContext) throws IOException,
InterruptedException {
+ while (!eventQueue.isEmpty()) {
+ SSEvent event = eventQueue.take();
+ asyncContext.getResponse().getWriter().write(event.toString());
+ asyncContext.getResponse().getWriter().println('\n');
+
+ //Calling response.flushBuffer() instead of writer.flush().
+ //This way an exception is thrown if the connection is already
closed on the client side.
+ asyncContext.getResponse().flushBuffer();
+ }
+ }
+
+ public BlockingQueue<SSEvent> getEventQueue() {
+ return eventQueue;
+ }
+
+ @Override
+ public boolean isRepeatable() {
+ return httpEntity.isRepeatable();
+ }
+
+ @Override
+ public long getContentLength() {
+ return httpEntity.getContentLength();
+ }
+
+ @Override
+ public InputStream getContent() throws IOException,
UnsupportedOperationException {
+ return httpEntity.getContent();
+ }
+
+ @Override
+ public void writeTo(OutputStream outStream) throws IOException {
+ httpEntity.writeTo(outStream);
+ }
+
+ @Override
+ public boolean isStreaming() {
+ return httpEntity.isStreaming();
+ }
+}
diff --git
a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEException.java
b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEException.java
new file mode 100644
index 000000000..a2bf9ae1b
--- /dev/null
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+public class SSEException extends RuntimeException {
+
+ public SSEException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git
a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEResponse.java
b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEResponse.java
new file mode 100644
index 000000000..47b7bfa16
--- /dev/null
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEResponse.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.apache.http.Header;
+import org.apache.http.HeaderIterator;
+import org.apache.http.HttpResponse;
+import org.apache.http.ProtocolVersion;
+import org.apache.http.StatusLine;
+import org.apache.http.message.BasicHttpResponse;
+
+import java.util.Locale;
+
+public class SSEResponse extends BasicHttpResponse {
+ private final HttpResponse inboundResponse;
+ private final SSEEntity entity;
+
+ public SSEResponse(HttpResponse inboundResponse) {
+ super(inboundResponse.getStatusLine());
+ this.inboundResponse = inboundResponse;
+ this.entity = new SSEEntity(inboundResponse.getEntity());
+ }
+
+ @Override
+ public SSEEntity getEntity() {
+ return entity;
+ }
+
+ @Override
+ public StatusLine getStatusLine() {
+ return inboundResponse.getStatusLine();
+ }
+
+ @Override
+ public void setStatusLine(StatusLine statusline) {
+ inboundResponse.setStatusLine(statusline);
+ }
+
+ @Override
+ public void setStatusLine(ProtocolVersion ver, int code) {
+ inboundResponse.setStatusLine(ver, code);
+ }
+
+ @Override
+ public void setStatusLine(ProtocolVersion ver, int code, String reason) {
+ inboundResponse.setStatusLine(ver, code, reason);
+ }
+
+ @Override
+ public void setStatusCode(int code) throws IllegalStateException {
+ inboundResponse.setStatusCode(code);
+ }
+
+ @Override
+ public void setReasonPhrase(String reason) throws IllegalStateException {
+ inboundResponse.setReasonPhrase(reason);
+ }
+
+ @Override
+ public Locale getLocale() {
+ return inboundResponse.getLocale();
+ }
+
+ @Override
+ public void setLocale(Locale loc) {
+ inboundResponse.setLocale(loc);
+ }
+
+ @Override
+ public ProtocolVersion getProtocolVersion() {
+ return inboundResponse.getProtocolVersion();
+ }
+
+ @Override
+ public boolean containsHeader(String name) {
+ return inboundResponse.containsHeader(name);
+ }
+
+ @Override
+ public Header[] getHeaders(String name) {
+ return inboundResponse.getHeaders(name);
+ }
+
+ @Override
+ public Header getFirstHeader(String name) {
+ return inboundResponse.getFirstHeader(name);
+ }
+
+ @Override
+ public Header getLastHeader(String name) {
+ return inboundResponse.getLastHeader(name);
+ }
+
+ @Override
+ public Header[] getAllHeaders() {
+ return inboundResponse.getAllHeaders();
+ }
+
+ @Override
+ public void addHeader(Header header) {
+ inboundResponse.addHeader(header);
+ }
+
+ @Override
+ public void addHeader(String name, String value) {
+ inboundResponse.addHeader(name, value);
+ }
+
+ @Override
+ public void setHeader(Header header) {
+ inboundResponse.setHeader(header);
+ }
+
+ @Override
+ public void setHeader(String name, String value) {
+ inboundResponse.setHeader(name, value);
+ }
+
+ @Override
+ public void setHeaders(Header[] headers) {
+ inboundResponse.setHeaders(headers);
+ }
+
+ @Override
+ public void removeHeader(Header header) {
+ inboundResponse.removeHeader(header);
+ }
+
+ @Override
+ public void removeHeaders(String name) {
+ inboundResponse.removeHeaders(name);
+ }
+
+ @Override
+ public HeaderIterator headerIterator() {
+ return inboundResponse.headerIterator();
+ }
+
+ @Override
+ public HeaderIterator headerIterator(String name) {
+ return inboundResponse.headerIterator(name);
+ }
+}
diff --git a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEvent.java
b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEvent.java
new file mode 100644
index 000000000..dc643ed0e
--- /dev/null
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEvent.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+public class SSEvent {
+
+ private String data;
+ private String event;
+ private String id;
+ private String comment;
+ private Long retry;
+
+ public SSEvent() {
+ }
+
+ public SSEvent(String data, String event, String id, String comment, Long
retry) {
+ this.data = data;
+ this.event = event;
+ this.id = id;
+ this.comment = comment;
+ this.retry = retry;
+ }
+
+ public void setData(String data) {
+ this.data = data;
+ }
+
+ public void setEvent(String event) {
+ this.event = event;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public void setComment(String comment) {
+ this.comment = comment;
+ }
+
+ public void setRetry(Long retry) {
+ this.retry = retry;
+ }
+
+ public String getData() {
+ return this.data;
+ }
+
+ public String getEvent() {
+ return this.event;
+ }
+
+ public String getId() {
+ return this.id;
+ }
+
+ public String getComment() {
+ return this.comment;
+ }
+
+ public Long getRetry() {
+ return this.retry;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder eventString = new StringBuilder();
+
+ this.appendField(eventString, this.id, "id:");
+ this.appendField(eventString, this.event, "event:");
+ this.appendField(eventString, this.data, "data:");
+ this.appendField(eventString, this.retry, "retry:");
+ this.appendField(eventString, this.comment, "");
+
+ return eventString.toString();
+ }
+
+ private void appendField(StringBuilder eventString, Object field, String
prefix) {
+ if (field != null) {
+ if (eventString.length() != 0) {
+ eventString.append('\n');
+ }
+ eventString.append(prefix);
+ eventString.append(field);
+ }
+ }
+}
diff --git
a/gateway-spi/src/test/java/org/apache/knox/gateway/dispatch/DefaultHttpAsyncClientFactoryTest.java
b/gateway-spi/src/test/java/org/apache/knox/gateway/dispatch/DefaultHttpAsyncClientFactoryTest.java
new file mode 100644
index 000000000..001f830c2
--- /dev/null
+++
b/gateway-spi/src/test/java/org/apache/knox/gateway/dispatch/DefaultHttpAsyncClientFactoryTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.knox.gateway.dispatch;
+
+import org.apache.http.client.config.CookieSpecs;
+import org.apache.http.nio.client.HttpAsyncClient;
+import org.apache.knox.gateway.config.GatewayConfig;
+import org.apache.knox.gateway.services.GatewayServices;
+import org.apache.knox.gateway.services.ServiceType;
+import org.apache.knox.gateway.services.security.KeystoreService;
+import org.junit.Test;
+
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletContext;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertNotNull;
+
+public class DefaultHttpAsyncClientFactoryTest {
+
+ @Test
+ public void testCreateHttpAsyncClientSSLContextDefaults() throws Exception
{
+ KeystoreService keystoreService = createMock(KeystoreService.class);
+
expect(keystoreService.getTruststoreForHttpClient()).andReturn(null).once();
+
+ GatewayConfig gatewayConfig = createMock(GatewayConfig.class);
+ expect(gatewayConfig.isMetricsEnabled()).andReturn(false).once();
+
expect(gatewayConfig.getHttpClientMaxConnections()).andReturn(32).once();
+
expect(gatewayConfig.getHttpClientConnectionTimeout()).andReturn(20000).once();
+
expect(gatewayConfig.getHttpClientSocketTimeout()).andReturn(20000).once();
+
expect(gatewayConfig.getHttpClientCookieSpec()).andReturn(CookieSpecs.STANDARD).anyTimes();
+
+ GatewayServices gatewayServices = createMock(GatewayServices.class);
+
expect(gatewayServices.getService(ServiceType.KEYSTORE_SERVICE)).andReturn(keystoreService).once();
+
+ ServletContext servletContext = createMock(ServletContext.class);
+
expect(servletContext.getAttribute(GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE)).andReturn(gatewayConfig).atLeastOnce();
+
expect(servletContext.getAttribute(GatewayServices.GATEWAY_SERVICES_ATTRIBUTE)).andReturn(gatewayServices).atLeastOnce();
+
+ FilterConfig filterConfig = createMock(FilterConfig.class);
+
expect(filterConfig.getServletContext()).andReturn(servletContext).atLeastOnce();
+
expect(filterConfig.getInitParameter("useTwoWaySsl")).andReturn("false").once();
+
expect(filterConfig.getInitParameter("httpclient.maxConnections")).andReturn(null).once();
+
expect(filterConfig.getInitParameter("httpclient.connectionTimeout")).andReturn(null).once();
+
expect(filterConfig.getInitParameter("httpclient.socketTimeout")).andReturn(null).once();
+
expect(filterConfig.getInitParameter("serviceRole")).andReturn(null).once();
+
expect(filterConfig.getInitParameter("httpclient.cookieSpec")).andReturn(null).anyTimes();
+
+ replay(keystoreService, gatewayConfig, gatewayServices,
servletContext, filterConfig);
+
+ DefaultHttpAsyncClientFactory factory = new
DefaultHttpAsyncClientFactory();
+ HttpAsyncClient client = factory.createAsyncHttpClient(filterConfig);
+ assertNotNull(client);
+
+ verify(keystoreService, gatewayConfig, gatewayServices,
servletContext, filterConfig);
+ }
+}
diff --git
a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java
b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java
new file mode 100644
index 000000000..66abcb791
--- /dev/null
+++ b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.apache.http.HttpStatus;
+import org.apache.http.client.config.CookieSpecs;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.knox.gateway.config.GatewayConfig;
+import org.apache.knox.gateway.services.GatewayServices;
+import org.apache.knox.gateway.services.ServiceType;
+import org.apache.knox.gateway.services.security.KeystoreService;
+import org.apache.knox.test.mock.MockServer;
+import org.apache.knox.test.mock.MockServletContext;
+import org.apache.knox.test.mock.MockServletInputStream;
+import org.easymock.EasyMock;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class SSEDispatchTest {
+
+ private static MockServer MOCK_SSE_SERVER;
+ private static URI URL;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ MOCK_SSE_SERVER = new MockServer("SSE", true);
+ URL = new URI("http://localhost:" + MOCK_SSE_SERVER.getPort() +
"/sse");
+ }
+
+ @Test
+ public void testCreateAndDestroyClient() throws Exception {
+ SSEDispatch sseDispatch = this.createDispatch();
+ assertNotNull(sseDispatch.getAsyncClient());
+
+ sseDispatch.destroy();
+ assertFalse(((CloseableHttpAsyncClient)
sseDispatch.getAsyncClient()).isRunning());
+ }
+
+ @Test
+ public void testGet2xx() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ SSEDispatch sseDispatch = this.createDispatch();
+ PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+ HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_OK);
+ AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
+ HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
+
+ this.expectResponseBodyAndHeader(printWriter, outboundResponse);
+ replay(inboundRequest, asyncContext, outboundResponse, printWriter);
+
+ MOCK_SSE_SERVER.expect()
+ .method("GET")
+ .pathInfo("/sse")
+ .header("request", "header")
+ .header("Accept", "text/event-stream")
+ .respond()
+ .status(HttpStatus.SC_OK)
+
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
StandardCharsets.UTF_8)
+ .header("response", "header")
+ .contentType("text/event-stream");
+
+ sseDispatch.doGet(URL, inboundRequest, outboundResponse);
+
+ latch.await(1L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest,
printWriter);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ }
+
+ @Test
+ public void testGet4xx() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ SSEDispatch sseDispatch = this.createDispatch();
+ HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_BAD_REQUEST);
+ AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
+ HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
+
+ replay(inboundRequest, asyncContext, outboundResponse);
+
+ MOCK_SSE_SERVER.expect()
+ .method("GET")
+ .pathInfo("/sse")
+ .respond()
+ .status(HttpStatus.SC_BAD_REQUEST);
+
+ sseDispatch.doGet(URL, inboundRequest, outboundResponse);
+
+ latch.await(1L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ }
+
+ @Test
+ public void testGet5xx() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ SSEDispatch sseDispatch = this.createDispatch();
+ HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+ AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
+ HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
+
+ replay(inboundRequest, asyncContext, outboundResponse);
+
+ MOCK_SSE_SERVER.expect()
+ .method("GET")
+ .pathInfo("/sse")
+ .respond()
+ .status(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+
+ sseDispatch.doGet(URL, inboundRequest, outboundResponse);
+
+ latch.await(1L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ }
+
+ @Test
+ public void testPost2xx() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ SSEDispatch sseDispatch = this.createDispatch();
+ PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+ HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_OK);
+ AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
+ HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
+
+ this.expectResponseBodyAndHeader(printWriter, outboundResponse);
+ replay(inboundRequest, asyncContext, outboundResponse, printWriter);
+
+ MOCK_SSE_SERVER.expect()
+ .method("POST")
+ .pathInfo("/sse")
+ .header("request", "header")
+ .header("Accept", "text/event-stream")
+ .content("{\"request\":\"body\"}", StandardCharsets.UTF_8)
+ .respond()
+ .status(HttpStatus.SC_OK)
+
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
StandardCharsets.UTF_8)
+ .header("response", "header")
+ .contentType("text/event-stream");
+
+ sseDispatch.doPost(URL, inboundRequest, outboundResponse);
+
+ latch.await(1L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest,
printWriter);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ }
+
+ @Test
+ public void testPost4xx() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ SSEDispatch sseDispatch = this.createDispatch();
+ HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_NOT_FOUND);
+ AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
+ HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
+
+ replay(inboundRequest, asyncContext, outboundResponse);
+
+ MOCK_SSE_SERVER.expect()
+ .method("POST")
+ .pathInfo("/sse")
+ .respond()
+ .status(HttpStatus.SC_NOT_FOUND);
+
+ sseDispatch.doPost(URL, inboundRequest, outboundResponse);
+
+ latch.await(1L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ }
+
+ @Test
+ public void testPost5xx() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ SSEDispatch sseDispatch = this.createDispatch();
+ HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+ AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
+ HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
+
+ replay(inboundRequest, asyncContext, outboundResponse);
+
+ MOCK_SSE_SERVER.expect()
+ .method("POST")
+ .pathInfo("/sse")
+ .respond()
+ .status(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+
+ sseDispatch.doPost(URL, inboundRequest, outboundResponse);
+
+ latch.await(1L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ }
+
+ @Test
+ public void testPut2xx() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ SSEDispatch sseDispatch = this.createDispatch();
+ PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+ HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_OK);
+ AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
+ HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
+
+ this.expectResponseBodyAndHeader(printWriter, outboundResponse);
+ replay(inboundRequest, asyncContext, outboundResponse, printWriter);
+
+ MOCK_SSE_SERVER.expect()
+ .method("PUT")
+ .pathInfo("/sse")
+ .header("request", "header")
+ .header("Accept", "text/event-stream")
+ .content("{\"request\":\"body\"}", StandardCharsets.UTF_8)
+ .respond()
+ .status(HttpStatus.SC_OK)
+
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
StandardCharsets.UTF_8)
+ .header("response", "header")
+ .contentType("text/event-stream");
+
+ sseDispatch.doPut(URL, inboundRequest, outboundResponse);
+
+ latch.await(1L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest,
printWriter);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ }
+
+ @Test
+ public void testPut4xx() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ SSEDispatch sseDispatch = this.createDispatch();
+ HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_NOT_FOUND);
+ AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
+ HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
+
+ replay(inboundRequest, asyncContext, outboundResponse);
+
+ MOCK_SSE_SERVER.expect()
+ .method("PUT")
+ .pathInfo("/sse")
+ .respond()
+ .status(HttpStatus.SC_NOT_FOUND);
+
+ sseDispatch.doPut(URL, inboundRequest, outboundResponse);
+
+ latch.await(1L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ }
+
+ @Test
+ public void testPut5xx() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ SSEDispatch sseDispatch = this.createDispatch();
+ HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+ AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
+ HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
+
+ replay(inboundRequest, asyncContext, outboundResponse);
+
+ MOCK_SSE_SERVER.expect()
+ .method("PUT")
+ .pathInfo("/sse")
+ .respond()
+ .status(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+
+ sseDispatch.doPut(URL, inboundRequest, outboundResponse);
+
+ latch.await(1L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ }
+
+ @Test
+ public void testPatch2xx() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ SSEDispatch sseDispatch = this.createDispatch();
+ PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+ HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_OK);
+ AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
+ HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
+
+ this.expectResponseBodyAndHeader(printWriter, outboundResponse);
+ replay(inboundRequest, asyncContext, outboundResponse, printWriter);
+
+ MOCK_SSE_SERVER.expect()
+ .method("PATCH")
+ .pathInfo("/sse")
+ .header("request", "header")
+ .header("Accept", "text/event-stream")
+ .content("{\"request\":\"body\"}", StandardCharsets.UTF_8)
+ .respond()
+ .status(HttpStatus.SC_OK)
+
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
StandardCharsets.UTF_8)
+ .header("response", "header")
+ .contentType("text/event-stream");
+
+ sseDispatch.doPatch(URL, inboundRequest, outboundResponse);
+
+ latch.await(1L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest,
printWriter);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ }
+
+ @Test
+ public void testPatch4xx() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ SSEDispatch sseDispatch = this.createDispatch();
+ HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_NOT_FOUND);
+ AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
+ HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
+
+ replay(inboundRequest, asyncContext, outboundResponse);
+
+ MOCK_SSE_SERVER.expect()
+ .method("PATCH")
+ .pathInfo("/sse")
+ .respond()
+ .status(HttpStatus.SC_NOT_FOUND);
+
+ sseDispatch.doPatch(URL, inboundRequest, outboundResponse);
+
+ latch.await(1L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ }
+
+ @Test
+ public void testPatch5xx() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ SSEDispatch sseDispatch = this.createDispatch();
+ HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+ AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
+ HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
+
+ replay(inboundRequest, asyncContext, outboundResponse);
+
+ MOCK_SSE_SERVER.expect()
+ .method("PATCH")
+ .pathInfo("/sse")
+ .respond()
+ .status(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+
+ sseDispatch.doPatch(URL, inboundRequest, outboundResponse);
+
+ latch.await(1L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ }
+
+ @Test
+ public void testServerNotAvailable() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ SSEDispatch sseDispatch = this.createDispatch();
+ HttpServletResponse outboundResponse =
EasyMock.createNiceMock(HttpServletResponse.class);
+ AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
+ HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
+
+ replay(inboundRequest, asyncContext, outboundResponse);
+
+ sseDispatch.doGet(new URI("http://localhost:11223/sse"),
inboundRequest, outboundResponse);
+
+ latch.await(1L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+ }
+
+ private HttpServletRequest getHttpServletRequest(AsyncContext
asyncContext) throws Exception {
+ Map<String, String> headers = new HashMap<>();
+ headers.put("request", "header");
+ InputStream stream = new
ByteArrayInputStream("{\"request\":\"body\"}".getBytes(StandardCharsets.UTF_8));
+ MockServletInputStream mockServletInputStream = new
MockServletInputStream(stream);
+ HttpServletRequest inboundRequest =
EasyMock.createNiceMock(HttpServletRequest.class);
+
+
EasyMock.expect(inboundRequest.getHeaderNames()).andReturn(Collections.enumeration(headers.keySet())).anyTimes();
+
EasyMock.expect(inboundRequest.startAsync()).andReturn(asyncContext).once();
+
EasyMock.expect(inboundRequest.getHeader("request")).andReturn("header").once();
+
EasyMock.expect(inboundRequest.getContentType()).andReturn("application/json").anyTimes();
+
EasyMock.expect(inboundRequest.getInputStream()).andReturn(mockServletInputStream).anyTimes();
+
EasyMock.expect(inboundRequest.getContentLength()).andReturn(mockServletInputStream.available()).anyTimes();
+ EasyMock.expect(inboundRequest.getServletContext()).andReturn(new
MockServletContext()).anyTimes();
+
+ return inboundRequest;
+ }
+
+ private AsyncContext getAsyncContext(CountDownLatch latch,
HttpServletResponse outboundResponse) {
+ AsyncContext asyncContext =
EasyMock.createNiceMock(AsyncContext.class);
+
+
EasyMock.expect(asyncContext.getResponse()).andReturn(outboundResponse).anyTimes();
+ asyncContext.complete();
+ EasyMock.expectLastCall().andAnswer(() -> {
+ latch.countDown();
+ return null;
+ });
+
+ return asyncContext;
+ }
+
+ private HttpServletResponse getServletResponse(int statusCode) {
+ HttpServletResponse outboundResponse =
EasyMock.createNiceMock(HttpServletResponse.class);
+
+ outboundResponse.setStatus(statusCode);
+ EasyMock.expectLastCall();
+
+ return outboundResponse;
+ }
+
+ private void expectResponseBodyAndHeader(PrintWriter printWriter,
HttpServletResponse outboundResponse) throws Exception {
+ outboundResponse.addHeader("response", "header");
+ EasyMock.expectLastCall();
+
EasyMock.expect(outboundResponse.getWriter()).andReturn(printWriter).anyTimes();
+ printWriter.write("id:1\nevent:event1\ndata:data1");
+ EasyMock.expectLastCall();
+ printWriter.write("id:2\nevent:event2\ndata:data2\nretry:1\n:testing");
+ EasyMock.expectLastCall();
+ printWriter.println('\n');
+ EasyMock.expectLastCall().times(2);
+ }
+
+ private SSEDispatch createDispatch() throws Exception {
+ KeystoreService keystoreService = createMock(KeystoreService.class);
+
expect(keystoreService.getTruststoreForHttpClient()).andReturn(null).once();
+
+ GatewayConfig gatewayConfig = createMock(GatewayConfig.class);
+ expect(gatewayConfig.isMetricsEnabled()).andReturn(false).once();
+
expect(gatewayConfig.getHttpClientMaxConnections()).andReturn(32).once();
+
expect(gatewayConfig.getHttpClientConnectionTimeout()).andReturn(20000).once();
+
expect(gatewayConfig.getHttpClientSocketTimeout()).andReturn(20000).once();
+
expect(gatewayConfig.getHttpClientCookieSpec()).andReturn(CookieSpecs.STANDARD).anyTimes();
+
+ GatewayServices gatewayServices = createMock(GatewayServices.class);
+
expect(gatewayServices.getService(ServiceType.KEYSTORE_SERVICE)).andReturn(keystoreService).once();
+
+ ServletContext servletContext = createMock(ServletContext.class);
+
expect(servletContext.getAttribute(GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE)).andReturn(gatewayConfig).atLeastOnce();
+
expect(servletContext.getAttribute(GatewayServices.GATEWAY_SERVICES_ATTRIBUTE)).andReturn(gatewayServices).atLeastOnce();
+
+ FilterConfig filterConfig = createMock(FilterConfig.class);
+
expect(filterConfig.getServletContext()).andReturn(servletContext).atLeastOnce();
+
expect(filterConfig.getInitParameter("useTwoWaySsl")).andReturn("false").once();
+
expect(filterConfig.getInitParameter("httpclient.maxConnections")).andReturn(null).once();
+
expect(filterConfig.getInitParameter("httpclient.connectionTimeout")).andReturn(null).once();
+
expect(filterConfig.getInitParameter("httpclient.socketTimeout")).andReturn(null).once();
+
expect(filterConfig.getInitParameter("serviceRole")).andReturn(null).once();
+
expect(filterConfig.getInitParameter("httpclient.cookieSpec")).andReturn(null).anyTimes();
+
+ replay(keystoreService, gatewayConfig, gatewayServices,
servletContext, filterConfig);
+
+ return new SSEDispatch(filterConfig);
+ }
+}
\ No newline at end of file
diff --git
a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEEntityTest.java
b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEEntityTest.java
new file mode 100644
index 000000000..1c0375d9a
--- /dev/null
+++ b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEEntityTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.apache.http.HttpEntity;
+import org.easymock.EasyMock;
+import org.junit.Test;
+
+import java.nio.CharBuffer;
+import java.util.concurrent.BlockingQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+
+public class SSEEntityTest {
+
+ private final HttpEntity entityMock =
EasyMock.createNiceMock(HttpEntity.class);
+
+ @Test
+ public void testParseSingleEvent() {
+ SSEEntity sseEntity = new SSEEntity(entityMock);
+ BlockingQueue<SSEvent> eventQueue = sseEntity.getEventQueue();
+ String unprocessedEvent = "id: 1\nevent: event\ndata:
data\nretry:1\n:testing\n\n";
+ CharBuffer cb = CharBuffer.wrap(unprocessedEvent);
+
+ sseEntity.readCharBuffer(cb);
+
+ assertFalse(eventQueue.isEmpty());
+
+ SSEvent actualSSEvent = eventQueue.peek();
+ assertEquals("1", actualSSEvent.getId());
+ assertEquals("event", actualSSEvent.getEvent());
+ assertEquals("data", actualSSEvent.getData());
+ assertEquals(1L, actualSSEvent.getRetry().longValue());
+ assertEquals(":testing", actualSSEvent.getComment());
+ }
+
+ @Test
+ public void testParseMultipleEvents() {
+ SSEEntity sseEntity = new SSEEntity(this.entityMock);
+ BlockingQueue<SSEvent> eventQueue = sseEntity.getEventQueue();
+ String unprocessedEvents = "id: 1\nevent: event\ndata: data\n\nid:
2\nevent: event2\ndata: data2\n\nid: 3\nevent: event3\ndata:
data3\nretry:1045\n:TEST\n\n";
+ CharBuffer cb = CharBuffer.wrap(unprocessedEvents);
+
+ sseEntity.readCharBuffer(cb);
+
+ assertEquals(3, eventQueue.size());
+
+ SSEvent actualSSEvent = eventQueue.poll();
+ assertEquals("1", actualSSEvent.getId());
+ assertEquals("event", actualSSEvent.getEvent());
+ assertEquals("data", actualSSEvent.getData());
+ assertNull(actualSSEvent.getRetry());
+ assertNull(actualSSEvent.getComment());
+
+ actualSSEvent = eventQueue.poll();
+ assertEquals("2", actualSSEvent.getId());
+ assertEquals("event2", actualSSEvent.getEvent());
+ assertEquals("data2", actualSSEvent.getData());
+ assertNull(actualSSEvent.getRetry());
+ assertNull(actualSSEvent.getComment());
+
+ actualSSEvent = eventQueue.poll();
+ assertEquals("3", actualSSEvent.getId());
+ assertEquals("event3", actualSSEvent.getEvent());
+ assertEquals("data3", actualSSEvent.getData());
+ assertEquals(1045L, actualSSEvent.getRetry().longValue());
+ assertEquals(":TEST", actualSSEvent.getComment());
+ }
+
+ @Test
+ public void testMissingNewLine() {
+ SSEEntity sseEntity = new SSEEntity(entityMock);
+ BlockingQueue<SSEvent> eventQueue = sseEntity.getEventQueue();
+ String unprocessedEvent = "id: 1\nevent: event\ndata:
data\nretry:1045\n:TEST\n";
+ CharBuffer cb = CharBuffer.wrap(unprocessedEvent);
+
+ sseEntity.readCharBuffer(cb);
+
+ assertTrue(eventQueue.isEmpty());
+ }
+
+ @Test
+ public void testParseEventWithSpecialChars() {
+ SSEEntity sseEntity = new SSEEntity(entityMock);
+ BlockingQueue<SSEvent> eventQueue = sseEntity.getEventQueue();
+ String unprocessedEvent = "id:
75a0a510-0065-498f:be39-c6f42a3fe4af\ndata:
data:{\"records\":[{\"col_str\":\"0e01eeef73f6833a98e1df6a5a00ea46f5b52dbee27ee89ebce894aaa555c90130b08fae8aaf600ef845b774ab0082fcaf8c\",\"col_int\":-580163093,\"col_ts\":\"2024-08-14T07:41:15.125\"}],\"job_status\":\"RUNNING\",\"end_of_samples\":false}\nretry:33\n::::test\n\n";
+ CharBuffer cb = CharBuffer.wrap(unprocessedEvent);
+
+ sseEntity.readCharBuffer(cb);
+
+ assertFalse(eventQueue.isEmpty());
+
+ SSEvent actualSSEvent = eventQueue.peek();
+ assertEquals("75a0a510-0065-498f:be39-c6f42a3fe4af",
actualSSEvent.getId());
+ assertNull(actualSSEvent.getEvent());
+
assertEquals("data:{\"records\":[{\"col_str\":\"0e01eeef73f6833a98e1df6a5a00ea46f5b52dbee27ee89ebce894aaa555c90130b08fae8aaf600ef845b774ab0082fcaf8c\",\"col_int\":-580163093,\"col_ts\":\"2024-08-14T07:41:15.125\"}],\"job_status\":\"RUNNING\",\"end_of_samples\":false}",
actualSSEvent.getData());
+ assertEquals(33L, actualSSEvent.getRetry().longValue());
+ assertEquals("::::test", actualSSEvent.getComment());
+ }
+
+ @Test
+ public void testInvalidFormat() {
+ SSEEntity sseEntity = new SSEEntity(entityMock);
+ BlockingQueue<SSEvent> eventQueue = sseEntity.getEventQueue();
+ String unprocessedEvent = "id: 1\nevent: event\ndata:
data\nretry:1045\n:TEST\nid: 1\nevent: event\ndata: data\nretry:1045\n:TEST\n";
+ CharBuffer cb = CharBuffer.wrap(unprocessedEvent);
+
+ sseEntity.readCharBuffer(cb);
+
+ assertTrue(eventQueue.isEmpty());
+ }
+
+ @Test
+ public void testParseEventsWithDifferentNewLineChars() {
+ SSEEntity sseEntity = new SSEEntity(entityMock);
+ BlockingQueue<SSEvent> eventQueue = sseEntity.getEventQueue();
+ String unprocessedEvents = "id: 1\nevent: event\ndata: data\r\nid:
2\nevent: event2\ndata: data2\u2028\nid: 3\nevent: event3\ndata:
data3\u2029\nid: 4\nevent: event4\ndata: data4\u0085\n";
+ CharBuffer cb = CharBuffer.wrap(unprocessedEvents);
+
+ sseEntity.readCharBuffer(cb);
+
+ assertEquals(4, eventQueue.size());
+
+ SSEvent actualSSEvent = eventQueue.poll();
+ assertEquals("1", actualSSEvent.getId());
+ assertEquals("event", actualSSEvent.getEvent());
+ assertEquals("data", actualSSEvent.getData());
+
+ actualSSEvent = eventQueue.poll();
+ assertEquals("2", actualSSEvent.getId());
+ assertEquals("event2", actualSSEvent.getEvent());
+ assertEquals("data2", actualSSEvent.getData());
+
+ actualSSEvent = eventQueue.poll();
+ assertEquals("3", actualSSEvent.getId());
+ assertEquals("event3", actualSSEvent.getEvent());
+ assertEquals("data3", actualSSEvent.getData());
+
+ actualSSEvent = eventQueue.poll();
+ assertEquals("4", actualSSEvent.getId());
+ assertEquals("event4", actualSSEvent.getEvent());
+ assertEquals("data4", actualSSEvent.getData());
+ }
+}
diff --git
a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEventTest.java
b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEventTest.java
new file mode 100644
index 000000000..378726ab3
--- /dev/null
+++ b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEventTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class SSEventTest {
+
+
+ @Test
+ public void testToStringWithAll() {
+ SSEvent ssEvent = new SSEvent("data", "event", "id", ":comment", 5L);
+ String expected = "id:id\nevent:event\ndata:data\nretry:5\n:comment";
+
+ assertEquals(expected, ssEvent.toString());
+ }
+
+ @Test
+ public void testToStringNoId() {
+ SSEvent ssEventNull = new SSEvent("data", "event", null, ":test", 2L);
+ SSEvent ssEventEmpty = new SSEvent("data", "event", "", ":new
comment", 1L);
+ String expectedForNull = "event:event\ndata:data\nretry:2\n:test";
+ String expectedForEmpty = "id:\nevent:event\ndata:data\nretry:1\n:new
comment";
+
+ assertEquals(expectedForNull, ssEventNull.toString());
+ assertEquals(expectedForEmpty, ssEventEmpty.toString());
+ }
+
+ @Test
+ public void testToStringNoEvent() {
+ SSEvent ssEventNull = new SSEvent("data", null, "id", ":comment", 11L);
+ SSEvent ssEventEmpty = new SSEvent("data", "", "id", ":test comment",
30L);
+ String expectedForNull = "id:id\ndata:data\nretry:11\n:comment";
+ String expectedForEmpty = "id:id\nevent:\ndata:data\nretry:30\n:test
comment";
+
+ assertEquals(expectedForNull, ssEventNull.toString());
+ assertEquals(expectedForEmpty, ssEventEmpty.toString());
+ }
+
+ @Test
+ public void testToStringNoData() {
+ SSEvent ssEventNull = new SSEvent(null, "event", "id", ":comment", 2L);
+ SSEvent ssEventEmpty = new SSEvent("", "event", "id", ":testing", 1L);
+ String expectedForNull = "id:id\nevent:event\nretry:2\n:comment";
+ String expectedForEmpty =
"id:id\nevent:event\ndata:\nretry:1\n:testing";
+
+ assertEquals(expectedForNull, ssEventNull.toString());
+ assertEquals(expectedForEmpty, ssEventEmpty.toString());
+ }
+
+ @Test
+ public void testToStringNoComment() {
+ SSEvent ssEventNull = new SSEvent("data", "event", "id", null, 3L);
+ String expected = "id:id\nevent:event\ndata:data\nretry:3";
+
+ assertEquals(expected, ssEventNull.toString());
+ }
+
+ @Test
+ public void testToStringNoRetry() {
+ SSEvent ssEventNull = new SSEvent("data", "event", "id", ":TEST",
null);
+ String expected = "id:id\nevent:event\ndata:data\n:TEST";
+
+ assertEquals(expected, ssEventNull.toString());
+ }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 084b6098b..75bd5155f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -167,6 +167,7 @@
<ant-nodeps.version>1.8.1</ant-nodeps.version>
<asm.version>9.0</asm.version>
<aspectj.version>1.9.6</aspectj.version>
+ <asynchttpclient.version>4.1.5</asynchttpclient.version>
<bcprov-jdk15on.version>1.67</bcprov-jdk15on.version>
<ben-manes.caffeine.version>2.8.8</ben-manes.caffeine.version>
<buildnumber-maven-plugin.version>1.4</buildnumber-maven-plugin.version>
@@ -1549,6 +1550,16 @@
<artifactId>httpcore</artifactId>
<version>${httpcore.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpasyncclient</artifactId>
+ <version>${asynchttpclient.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore-nio</artifactId>
+ <version>${httpcore.version}</version>
+ </dependency>
<dependency>
<groupId>joda-time</groupId>