This is an automated email from the ASF dual-hosted git repository.

pzampino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/knox.git


The following commit(s) were added to refs/heads/master by this push:
     new 58dd02b02 KNOX-3065: SSE support for Knox (#947)
58dd02b02 is described below

commit 58dd02b02c62b4bb5f54edd9d91ff29d4d616fb2
Author: hanicz <[email protected]>
AuthorDate: Mon Nov 11 14:46:31 2024 +0100

    KNOX-3065: SSE support for Knox (#947)
---
 gateway-spi/pom.xml                                |  18 +
 .../apache/knox/gateway/SpiGatewayMessages.java    |  15 +
 .../dispatch/DefaultHttpAsyncClientFactory.java    |  87 ++++
 .../gateway/dispatch/DefaultHttpClientFactory.java |  18 +-
 .../gateway/dispatch/GatewayDispatchFilter.java    |  20 +-
 .../gateway/dispatch/HttpAsyncClientFactory.java   |  26 ++
 .../org/apache/knox/gateway/sse/SSEDispatch.java   | 234 ++++++++++
 .../org/apache/knox/gateway/sse/SSEEntity.java     | 150 +++++++
 .../org/apache/knox/gateway/sse/SSEException.java  |  25 ++
 .../org/apache/knox/gateway/sse/SSEResponse.java   | 158 +++++++
 .../java/org/apache/knox/gateway/sse/SSEvent.java  | 101 +++++
 .../DefaultHttpAsyncClientFactoryTest.java         |  76 ++++
 .../apache/knox/gateway/sse/SSEDispatchTest.java   | 483 +++++++++++++++++++++
 .../org/apache/knox/gateway/sse/SSEEntityTest.java | 163 +++++++
 .../org/apache/knox/gateway/sse/SSEventTest.java   |  83 ++++
 pom.xml                                            |  11 +
 16 files changed, 1656 insertions(+), 12 deletions(-)

diff --git a/gateway-spi/pom.xml b/gateway-spi/pom.xml
index b6151b0bf..d09c39bf4 100644
--- a/gateway-spi/pom.xml
+++ b/gateway-spi/pom.xml
@@ -55,6 +55,24 @@
             <artifactId>gateway-util-urltemplate</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpasyncclient</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.httpcomponents</groupId>
+                    <artifactId>httpcore</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.httpcomponents</groupId>
+                    <artifactId>httpcore-nio</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpcore-nio</artifactId>
+        </dependency>
         <dependency>
             <groupId>javax.servlet</groupId>
             <artifactId>javax.servlet-api</artifactId>
diff --git 
a/gateway-spi/src/main/java/org/apache/knox/gateway/SpiGatewayMessages.java 
b/gateway-spi/src/main/java/org/apache/knox/gateway/SpiGatewayMessages.java
index 771e616f4..4440069d8 100644
--- a/gateway-spi/src/main/java/org/apache/knox/gateway/SpiGatewayMessages.java
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/SpiGatewayMessages.java
@@ -123,4 +123,19 @@ public interface SpiGatewayMessages {
 
   @Message( level = MessageLevel.ERROR, text = "No valid principal found" )
   void noPrincipalFound();
+
+  @Message( level = MessageLevel.INFO, text = "Every event was read from the 
SSE stream" )
+  void sseConnectionDone();
+
+  @Message( level = MessageLevel.ERROR, text = "Error during SSE connection: 
{0}" )
+  void sseConnectionError(String error);
+
+  @Message( level = MessageLevel.ERROR, text = "Error writing into the SSE 
output stream : {0}" )
+  void errorWritingOutputStream(@StackTrace(level=MessageLevel.ERROR) 
Exception e);
+
+  @Message( level = MessageLevel.WARN, text = "SSE connection cancelled" )
+  void sseConnectionCancelled();
+
+  @Message( level = MessageLevel.ERROR, text = "Unable to close SSE producer" )
+  void sseProducerCloseError(@StackTrace(level=MessageLevel.ERROR) Exception 
e);
 }
diff --git 
a/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/DefaultHttpAsyncClientFactory.java
 
b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/DefaultHttpAsyncClientFactory.java
new file mode 100644
index 000000000..24840233f
--- /dev/null
+++ 
b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/DefaultHttpAsyncClientFactory.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.dispatch;
+
+import org.apache.http.auth.AuthSchemeProvider;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.config.AuthSchemes;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+import org.apache.http.nio.client.HttpAsyncClient;
+import org.apache.knox.gateway.config.GatewayConfig;
+import org.apache.knox.gateway.services.GatewayServices;
+import org.apache.knox.gateway.services.ServiceType;
+import org.apache.knox.gateway.services.metrics.MetricsService;
+
+import javax.net.ssl.SSLContext;
+import javax.servlet.FilterConfig;
+
+public class DefaultHttpAsyncClientFactory extends DefaultHttpClientFactory 
implements HttpAsyncClientFactory {
+
+    @Override
+    public HttpAsyncClient createAsyncHttpClient(FilterConfig filterConfig) {
+        final String serviceRole = 
filterConfig.getInitParameter(PARAMETER_SERVICE_ROLE);
+        HttpAsyncClientBuilder builder;
+        GatewayConfig gatewayConfig = (GatewayConfig) 
filterConfig.getServletContext().getAttribute(GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE);
+        GatewayServices services = (GatewayServices) 
filterConfig.getServletContext()
+                .getAttribute(GatewayServices.GATEWAY_SERVICES_ATTRIBUTE);
+        if (gatewayConfig != null && gatewayConfig.isMetricsEnabled()) {
+            MetricsService metricsService = 
services.getService(ServiceType.METRICS_SERVICE);
+            builder = 
metricsService.getInstrumented(HttpAsyncClientBuilder.class);
+        } else {
+            builder = HttpAsyncClients.custom();
+        }
+
+        // Conditionally set a custom SSLContext
+        SSLContext sslContext = createSSLContext(services, filterConfig, 
serviceRole);
+        if (sslContext != null) {
+            builder.setSSLContext(sslContext);
+        }
+
+        if 
(Boolean.parseBoolean(System.getProperty(GatewayConfig.HADOOP_KERBEROS_SECURED)))
 {
+            CredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
+            credentialsProvider.setCredentials(AuthScope.ANY, new 
DefaultHttpAsyncClientFactory.UseJaasCredentials());
+
+            Registry<AuthSchemeProvider> authSchemeRegistry = 
RegistryBuilder.<AuthSchemeProvider>create()
+                    .register(AuthSchemes.SPNEGO, new 
KnoxSpnegoAuthSchemeFactory(true))
+                    .build();
+
+            builder.setDefaultAuthSchemeRegistry(authSchemeRegistry)
+                    .setDefaultCookieStore(new 
HadoopAuthCookieStore(gatewayConfig))
+                    .setDefaultCredentialsProvider(credentialsProvider);
+        } else {
+            builder.setDefaultCookieStore(new 
DefaultHttpAsyncClientFactory.NoCookieStore());
+        }
+
+        
builder.setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE);
+        
builder.setConnectionReuseStrategy(DefaultConnectionReuseStrategy.INSTANCE);
+        builder.setRedirectStrategy(new 
DefaultHttpAsyncClientFactory.NeverRedirectStrategy());
+        int maxConnections = getMaxConnections(filterConfig);
+        builder.setMaxConnTotal(maxConnections);
+        builder.setMaxConnPerRoute(maxConnections);
+        builder.setDefaultRequestConfig(getRequestConfig(filterConfig, 
serviceRole));
+
+        return builder.build();
+    }
+}
diff --git 
a/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/DefaultHttpClientFactory.java
 
b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/DefaultHttpClientFactory.java
index 03a4f891a..cb658c8fe 100644
--- 
a/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/DefaultHttpClientFactory.java
+++ 
b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/DefaultHttpClientFactory.java
@@ -69,7 +69,7 @@ import org.joda.time.format.PeriodFormatterBuilder;
 
 public class DefaultHttpClientFactory implements HttpClientFactory {
   private static final SpiGatewayMessages LOG = 
MessagesFactory.get(SpiGatewayMessages.class);
-  private static final String PARAMETER_SERVICE_ROLE = "serviceRole";
+  protected static final String PARAMETER_SERVICE_ROLE = "serviceRole";
   static final String PARAMETER_USE_TWO_WAY_SSL = "useTwoWaySsl";
   /* retry in case of NoHttpResponseException */
   static final String PARAMETER_RETRY_COUNT = "retryCount";
@@ -255,7 +255,7 @@ public class DefaultHttpClientFactory implements 
HttpClientFactory {
     return builder.build();
   }
 
-  private static class NoCookieStore implements CookieStore {
+  protected static class NoCookieStore implements CookieStore {
     @Override
     public void addCookie(Cookie cookie) {
       //no op
@@ -277,7 +277,7 @@ public class DefaultHttpClientFactory implements 
HttpClientFactory {
     }
   }
 
-  private static class NeverRedirectStrategy implements RedirectStrategy {
+  protected static class NeverRedirectStrategy implements RedirectStrategy {
     @Override
     public boolean isRedirected( HttpRequest request, HttpResponse response, 
HttpContext context )
         throws ProtocolException {
@@ -298,7 +298,7 @@ public class DefaultHttpClientFactory implements 
HttpClientFactory {
     }
   }
 
-  private static class UseJaasCredentials implements Credentials {
+  protected static class UseJaasCredentials implements Credentials {
 
     @Override
     public String getPassword() {
@@ -312,7 +312,7 @@ public class DefaultHttpClientFactory implements 
HttpClientFactory {
 
   }
 
-  private int getMaxConnections( FilterConfig filterConfig ) {
+  protected int getMaxConnections( FilterConfig filterConfig ) {
     int maxConnections = 32;
     GatewayConfig config =
         (GatewayConfig)filterConfig.getServletContext().getAttribute( 
GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE );
@@ -330,7 +330,7 @@ public class DefaultHttpClientFactory implements 
HttpClientFactory {
     return maxConnections;
   }
 
-  private static int getConnectionTimeout( FilterConfig filterConfig ) {
+  protected static int getConnectionTimeout( FilterConfig filterConfig ) {
     int timeout = -1;
     GatewayConfig globalConfig =
         (GatewayConfig)filterConfig.getServletContext().getAttribute( 
GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE );
@@ -348,7 +348,7 @@ public class DefaultHttpClientFactory implements 
HttpClientFactory {
     return timeout;
   }
 
-  private static int getSocketTimeout( FilterConfig filterConfig ) {
+  protected static int getSocketTimeout( FilterConfig filterConfig ) {
     int timeout = -1;
     GatewayConfig globalConfig =
         (GatewayConfig)filterConfig.getServletContext().getAttribute( 
GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE );
@@ -366,7 +366,7 @@ public class DefaultHttpClientFactory implements 
HttpClientFactory {
     return timeout;
   }
 
-  private static long parseTimeout( String s ) {
+  protected static long parseTimeout( String s ) {
     PeriodFormatter f = new PeriodFormatterBuilder()
         .appendMinutes().appendSuffix("m"," min")
         .appendSeconds().appendSuffix("s"," sec")
@@ -375,7 +375,7 @@ public class DefaultHttpClientFactory implements 
HttpClientFactory {
     return p.toStandardDuration().getMillis();
   }
 
-  private static String getCookieSpec(FilterConfig filterConfig) {
+  protected static String getCookieSpec(FilterConfig filterConfig) {
     String cookieSpec = filterConfig.getInitParameter("httpclient.cookieSpec");
     if (StringUtils.isNotBlank(cookieSpec)) {
       return cookieSpec;
diff --git 
a/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/GatewayDispatchFilter.java
 
b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/GatewayDispatchFilter.java
index 870d3f788..48196d137 100644
--- 
a/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/GatewayDispatchFilter.java
+++ 
b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/GatewayDispatchFilter.java
@@ -32,6 +32,7 @@ import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 import java.net.MalformedURLException;
 import java.net.URISyntaxException;
 import java.util.Collections;
@@ -71,13 +72,13 @@ public class GatewayDispatchFilter extends 
AbstractGatewayFilter {
     synchronized(lock) {
       if (dispatch == null) {
         String dispatchImpl = filterConfig.getInitParameter("dispatch-impl");
-        dispatch = newInstanceFromName(dispatchImpl);
+        dispatch = newInstanceFromName(dispatchImpl, filterConfig);
       }
       
ConfigurationInjectorBuilder.configuration().target(dispatch).source(filterConfig).inject();
       HttpClientFactory httpClientFactory;
       String httpClientFactoryClass = 
filterConfig.getInitParameter("httpClientFactory");
       if (httpClientFactoryClass != null) {
-        httpClientFactory = newInstanceFromName(httpClientFactoryClass);
+        httpClientFactory = newInstanceFromName(httpClientFactoryClass, 
filterConfig);
       } else {
         httpClientFactory = new DefaultHttpClientFactory();
       }
@@ -217,15 +218,28 @@ public class GatewayDispatchFilter extends 
AbstractGatewayFilter {
     }
   }
 
-  private <T> T newInstanceFromName(String dispatchImpl) throws 
ServletException {
+  private <T> T newInstanceFromName(String dispatchImpl, FilterConfig 
filterConfig) throws ServletException {
     try {
       Class<T> clazz = loadClass(dispatchImpl);
+      Constructor<?> constructor = this.getConstructorWithType(clazz, 
FilterConfig.class);
+      if(constructor != null) {
+        return (T) constructor.newInstance(filterConfig);
+      }
       return clazz.newInstance();
     } catch ( Exception e ) {
       throw new ServletException(e);
     }
   }
 
+  private <T> Constructor<?> getConstructorWithType(Class<T> clazz, Class<?> 
type) {
+    for (Constructor<?> constructor : clazz.getConstructors()) {
+      if (constructor.getParameterCount() == 1 && 
constructor.getParameterTypes()[0] == type) {
+        return constructor;
+      }
+    }
+    return null;
+  }
+
   private <T> Class<T> loadClass(String className) throws 
ClassNotFoundException {
       ClassLoader loader = Thread.currentThread().getContextClassLoader();
       if ( loader == null ) {
diff --git 
a/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/HttpAsyncClientFactory.java
 
b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/HttpAsyncClientFactory.java
new file mode 100644
index 000000000..c5180ec01
--- /dev/null
+++ 
b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/HttpAsyncClientFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.dispatch;
+
+import org.apache.http.nio.client.HttpAsyncClient;
+
+import javax.servlet.FilterConfig;
+
+public interface HttpAsyncClientFactory extends HttpClientFactory {
+    HttpAsyncClient createAsyncHttpClient(FilterConfig filterConfig);
+}
diff --git 
a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java 
b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java
new file mode 100644
index 000000000..318c4bb82
--- /dev/null
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.nio.IOControl;
+import org.apache.http.nio.client.HttpAsyncClient;
+import org.apache.http.nio.client.methods.AsyncCharConsumer;
+import org.apache.http.nio.client.methods.HttpAsyncMethods;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.http.protocol.HttpContext;
+import org.apache.knox.gateway.audit.api.Action;
+import org.apache.knox.gateway.audit.api.ActionOutcome;
+import org.apache.knox.gateway.audit.api.ResourceType;
+import org.apache.knox.gateway.dispatch.ConfigurableDispatch;
+import org.apache.knox.gateway.dispatch.DefaultHttpAsyncClientFactory;
+import org.apache.knox.gateway.dispatch.HttpAsyncClientFactory;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.FilterConfig;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.CharBuffer;
+import java.nio.charset.StandardCharsets;
+
+public class SSEDispatch extends ConfigurableDispatch {
+
+    private final HttpAsyncClient asyncClient;
+    private static final String TEXT_EVENT_STREAM_VALUE = "text/event-stream";
+
+    public SSEDispatch(FilterConfig filterConfig) {
+        HttpAsyncClientFactory asyncClientFactory = new 
DefaultHttpAsyncClientFactory();
+        this.asyncClient = 
asyncClientFactory.createAsyncHttpClient(filterConfig);
+
+        if (asyncClient instanceof CloseableHttpAsyncClient) {
+            ((CloseableHttpAsyncClient) this.asyncClient).start();
+        }
+    }
+
+    @Override
+    public void doGet(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException {
+        final HttpGet httpGetRequest = new HttpGet(url);
+        this.doHttpMethod(httpGetRequest, inboundRequest, outboundResponse);
+    }
+
+    @Override
+    public void doPost(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException, URISyntaxException {
+        final HttpPost httpPostRequest = new HttpPost(url);
+        httpPostRequest.setEntity(this.createRequestEntity(inboundRequest));
+        this.doHttpMethod(httpPostRequest, inboundRequest, outboundResponse);
+    }
+
+    @Override
+    public void doPut(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException {
+        final HttpPut httpPutRequest = new HttpPut(url);
+        httpPutRequest.setEntity(this.createRequestEntity(inboundRequest));
+        this.doHttpMethod(httpPutRequest, inboundRequest, outboundResponse);
+    }
+
+    @Override
+    public void doPatch(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException {
+        final HttpPatch httpPatchRequest = new HttpPatch(url);
+        httpPatchRequest.setEntity(this.createRequestEntity(inboundRequest));
+        this.doHttpMethod(httpPatchRequest, inboundRequest, outboundResponse);
+    }
+
+    private void doHttpMethod(HttpUriRequest httpMethod, HttpServletRequest 
inboundRequest, HttpServletResponse outboundResponse) throws IOException {
+        this.addAcceptHeader(httpMethod);
+        this.copyRequestHeaderFields(httpMethod, inboundRequest);
+        this.executeRequest(httpMethod, outboundResponse, inboundRequest);
+    }
+
+    private void executeRequest(HttpUriRequest outboundRequest, 
HttpServletResponse outboundResponse, HttpServletRequest inboundRequest) {
+        AsyncContext asyncContext = inboundRequest.startAsync();
+        //No timeout
+        asyncContext.setTimeout(0L);
+
+        HttpAsyncRequestProducer producer = 
HttpAsyncMethods.create(outboundRequest);
+        AsyncCharConsumer<SSEResponse> consumer = new 
SSECharConsumer(outboundResponse, outboundRequest.getURI(), asyncContext);
+        this.executeAsyncRequest(producer, consumer, outboundRequest);
+    }
+
+    private void executeAsyncRequest(HttpAsyncRequestProducer producer, 
AsyncCharConsumer<SSEResponse> consumer, HttpUriRequest outboundRequest) {
+        LOG.dispatchRequest(outboundRequest.getMethod(), 
outboundRequest.getURI());
+        auditor.audit(Action.DISPATCH, outboundRequest.getURI().toString(), 
ResourceType.URI, ActionOutcome.UNAVAILABLE, 
RES.requestMethod(outboundRequest.getMethod()));
+        asyncClient.execute(producer, consumer, new 
FutureCallback<SSEResponse>() {
+
+            @Override
+            public void completed(final SSEResponse response) {
+                closeProducer(producer);
+                LOG.sseConnectionDone();
+            }
+
+            @Override
+            public void failed(final Exception ex) {
+                closeProducer(producer);
+                LOG.sseConnectionError(ex.getMessage());
+            }
+
+            @Override
+            public void cancelled() {
+                closeProducer(producer);
+                LOG.sseConnectionCancelled();
+            }
+        });
+    }
+
+    private void addAcceptHeader(HttpUriRequest outboundRequest) {
+        outboundRequest.setHeader(HttpHeaders.ACCEPT, 
SSEDispatch.TEXT_EVENT_STREAM_VALUE);
+    }
+
+    private void handleSuccessResponse(HttpServletResponse outboundResponse, 
URI url, HttpResponse inboundResponse) {
+        this.prepareServletResponse(outboundResponse, 
inboundResponse.getStatusLine().getStatusCode());
+        this.copyResponseHeaderFields(outboundResponse, inboundResponse);
+        auditor.audit(Action.DISPATCH, url.toString(), ResourceType.URI, 
ActionOutcome.SUCCESS, RES.responseStatus(HttpStatus.SC_OK));
+    }
+
+    private void handleErrorResponse(HttpServletResponse outboundResponse, URI 
url, HttpResponse httpResponse) {
+        int statusCode = httpResponse.getStatusLine().getStatusCode();
+        outboundResponse.setStatus(statusCode);
+        LOG.dispatchResponseStatusCode(statusCode);
+        auditor.audit(Action.DISPATCH, url.toString(), ResourceType.URI, 
ActionOutcome.FAILURE, RES.responseStatus(statusCode));
+    }
+
+    private void prepareServletResponse(HttpServletResponse outboundResponse, 
int statusCode) {
+        LOG.dispatchResponseStatusCode(statusCode);
+        outboundResponse.setStatus(statusCode);
+        outboundResponse.setCharacterEncoding(StandardCharsets.UTF_8.name());
+    }
+
+    private boolean isSuccessful(int statusCode) {
+        return (statusCode >= HttpStatus.SC_OK && statusCode < 300);
+    }
+
+    private void closeProducer(HttpAsyncRequestProducer producer) {
+        try {
+            producer.close();
+        } catch (IOException e) {
+            LOG.sseProducerCloseError(e);
+        }
+    }
+
+    private class SSECharConsumer extends AsyncCharConsumer<SSEResponse> {
+        private SSEResponse sseResponse;
+        private final HttpServletResponse outboundResponse;
+        private final URI url;
+        private final AsyncContext asyncContext;
+
+        SSECharConsumer(HttpServletResponse outboundResponse, URI url, 
AsyncContext asyncContext) {
+            this.outboundResponse = outboundResponse;
+            this.url = url;
+            this.asyncContext = asyncContext;
+        }
+
+        @Override
+        protected void onResponseReceived(final HttpResponse inboundResponse) {
+            this.sseResponse = new SSEResponse(inboundResponse);
+            if (isSuccessful(inboundResponse.getStatusLine().getStatusCode())) 
{
+                handleSuccessResponse(outboundResponse, url, inboundResponse);
+            } else {
+                handleErrorResponse(outboundResponse, url, inboundResponse);
+            }
+        }
+
+        @Override
+        protected void onCharReceived(final CharBuffer buf, final IOControl 
ioctl) {
+            try {
+                if (this.sseResponse.getEntity().readCharBuffer(buf)) {
+                    this.sseResponse.getEntity().sendEvent(this.asyncContext);
+                }
+            } catch (InterruptedException | IOException e) {
+                LOG.errorWritingOutputStream(e);
+                throw new SSEException(e.getMessage(), e);
+            }
+        }
+
+        @Override
+        protected void releaseResources() {
+            this.asyncContext.complete();
+        }
+
+        @Override
+        protected SSEResponse buildResult(final HttpContext context) {
+            return this.sseResponse;
+
+        }
+    }
+
+    @Override
+    public void destroy() {
+        try {
+            if (this.asyncClient != null && asyncClient instanceof 
CloseableHttpAsyncClient) {
+                ((CloseableHttpAsyncClient) asyncClient).close();
+            }
+        } catch (IOException e) {
+            LOG.errorClosingHttpClient(e);
+        }
+    }
+
+    public HttpAsyncClient getAsyncClient() {
+        return this.asyncClient;
+    }
+}
diff --git 
a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEEntity.java 
b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEEntity.java
new file mode 100644
index 000000000..8b598c631
--- /dev/null
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEEntity.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.entity.AbstractHttpEntity;
+
+import javax.servlet.AsyncContext;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.CharBuffer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class SSEEntity extends AbstractHttpEntity {
+
+    private static final String SSE_DELIMITER = ":";
+
+    private final BlockingQueue<SSEvent> eventQueue;
+    private final StringBuilder eventBuilder = new StringBuilder();
+    private final HttpEntity httpEntity;
+    private char previousChar = '0';
+
+    public SSEEntity(HttpEntity httpEntity) {
+        this.httpEntity = httpEntity;
+        this.eventQueue = new LinkedBlockingQueue<>();
+    }
+
+    public boolean readCharBuffer(CharBuffer charBuffer) {
+        while (charBuffer.hasRemaining()) {
+            processChar(charBuffer.get());
+        }
+        return !eventQueue.isEmpty();
+    }
+
+    //Two new line chars (\n\n) after each other means the event is finished 
streaming
+    //We can process it and add it to the event queue
+    private void processChar(char nextChar) {
+        if (isNewLineChar(nextChar) && isNewLineChar(previousChar)) {
+            processEvent();
+            eventBuilder.setLength(0);
+            previousChar = '0';
+        } else {
+            eventBuilder.append(nextChar);
+            previousChar = nextChar;
+        }
+    }
+
+    private boolean isNewLineChar(char c) {
+        return c == '\n' || c == '\r' || c == '\u0085' || c == '\u2028' || c 
== '\u2029';
+    }
+
+    private void processEvent() {
+        String unprocessedEvent = eventBuilder.toString();
+        SSEvent ssEvent = new SSEvent();
+
+        for (String line : unprocessedEvent.split("\\R")) {
+            String[] lineTokens =  this.parseLine(line);
+            switch (lineTokens[0]) {
+                case "id":
+                    ssEvent.setId(lineTokens[1].trim());
+                    break;
+                case "event":
+                    ssEvent.setEvent(lineTokens[1].trim());
+                    break;
+                case "data":
+                    ssEvent.setData(lineTokens[1].trim());
+                    break;
+                case "comment":
+                    ssEvent.setComment(lineTokens[1].trim());
+                    break;
+                case "retry":
+                    ssEvent.setRetry(Long.parseLong(lineTokens[1].trim()));
+                    break;
+                default:
+                    break;
+            }
+        }
+        eventQueue.add(ssEvent);
+    }
+
+    private String[] parseLine(String line) {
+        String[] lineTokens = new String[2];
+        if(line.startsWith(SSE_DELIMITER)) {
+            lineTokens[0] = "comment";
+            lineTokens[1] = line;
+        } else {
+            lineTokens = line.split(SSE_DELIMITER, 2);
+        }
+
+        return lineTokens;
+    }
+
+    public void sendEvent(AsyncContext asyncContext) throws IOException, 
InterruptedException {
+        while (!eventQueue.isEmpty()) {
+            SSEvent event = eventQueue.take();
+            asyncContext.getResponse().getWriter().write(event.toString());
+            asyncContext.getResponse().getWriter().println('\n');
+
+            //Calling response.flushBuffer() instead of writer.flush().
+            //This way an exception is thrown if the connection is already 
closed on the client side.
+            asyncContext.getResponse().flushBuffer();
+        }
+    }
+
+    public BlockingQueue<SSEvent> getEventQueue() {
+        return eventQueue;
+    }
+
+    @Override
+    public boolean isRepeatable() {
+        return httpEntity.isRepeatable();
+    }
+
+    @Override
+    public long getContentLength() {
+        return httpEntity.getContentLength();
+    }
+
+    @Override
+    public InputStream getContent() throws IOException, 
UnsupportedOperationException {
+        return httpEntity.getContent();
+    }
+
+    @Override
+    public void writeTo(OutputStream outStream) throws IOException {
+        httpEntity.writeTo(outStream);
+    }
+
+    @Override
+    public boolean isStreaming() {
+        return httpEntity.isStreaming();
+    }
+}
diff --git 
a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEException.java 
b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEException.java
new file mode 100644
index 000000000..a2bf9ae1b
--- /dev/null
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+public class SSEException extends RuntimeException {
+
+    public SSEException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git 
a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEResponse.java 
b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEResponse.java
new file mode 100644
index 000000000..47b7bfa16
--- /dev/null
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEResponse.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.apache.http.Header;
+import org.apache.http.HeaderIterator;
+import org.apache.http.HttpResponse;
+import org.apache.http.ProtocolVersion;
+import org.apache.http.StatusLine;
+import org.apache.http.message.BasicHttpResponse;
+
+import java.util.Locale;
+
+public class SSEResponse extends BasicHttpResponse {
+    private final HttpResponse inboundResponse;
+    private final SSEEntity entity;
+
+    public SSEResponse(HttpResponse inboundResponse) {
+        super(inboundResponse.getStatusLine());
+        this.inboundResponse = inboundResponse;
+        this.entity = new SSEEntity(inboundResponse.getEntity());
+    }
+
+    @Override
+    public SSEEntity getEntity() {
+        return entity;
+    }
+
+    @Override
+    public StatusLine getStatusLine() {
+        return inboundResponse.getStatusLine();
+    }
+
+    @Override
+    public void setStatusLine(StatusLine statusline) {
+        inboundResponse.setStatusLine(statusline);
+    }
+
+    @Override
+    public void setStatusLine(ProtocolVersion ver, int code) {
+        inboundResponse.setStatusLine(ver, code);
+    }
+
+    @Override
+    public void setStatusLine(ProtocolVersion ver, int code, String reason) {
+        inboundResponse.setStatusLine(ver, code, reason);
+    }
+
+    @Override
+    public void setStatusCode(int code) throws IllegalStateException {
+        inboundResponse.setStatusCode(code);
+    }
+
+    @Override
+    public void setReasonPhrase(String reason) throws IllegalStateException {
+        inboundResponse.setReasonPhrase(reason);
+    }
+
+    @Override
+    public Locale getLocale() {
+        return inboundResponse.getLocale();
+    }
+
+    @Override
+    public void setLocale(Locale loc) {
+        inboundResponse.setLocale(loc);
+    }
+
+    @Override
+    public ProtocolVersion getProtocolVersion() {
+        return inboundResponse.getProtocolVersion();
+    }
+
+    @Override
+    public boolean containsHeader(String name) {
+        return inboundResponse.containsHeader(name);
+    }
+
+    @Override
+    public Header[] getHeaders(String name) {
+        return inboundResponse.getHeaders(name);
+    }
+
+    @Override
+    public Header getFirstHeader(String name) {
+        return inboundResponse.getFirstHeader(name);
+    }
+
+    @Override
+    public Header getLastHeader(String name) {
+        return inboundResponse.getLastHeader(name);
+    }
+
+    @Override
+    public Header[] getAllHeaders() {
+        return inboundResponse.getAllHeaders();
+    }
+
+    @Override
+    public void addHeader(Header header) {
+        inboundResponse.addHeader(header);
+    }
+
+    @Override
+    public void addHeader(String name, String value) {
+        inboundResponse.addHeader(name, value);
+    }
+
+    @Override
+    public void setHeader(Header header) {
+        inboundResponse.setHeader(header);
+    }
+
+    @Override
+    public void setHeader(String name, String value) {
+        inboundResponse.setHeader(name, value);
+    }
+
+    @Override
+    public void setHeaders(Header[] headers) {
+        inboundResponse.setHeaders(headers);
+    }
+
+    @Override
+    public void removeHeader(Header header) {
+        inboundResponse.removeHeader(header);
+    }
+
+    @Override
+    public void removeHeaders(String name) {
+        inboundResponse.removeHeaders(name);
+    }
+
+    @Override
+    public HeaderIterator headerIterator() {
+        return inboundResponse.headerIterator();
+    }
+
+    @Override
+    public HeaderIterator headerIterator(String name) {
+        return inboundResponse.headerIterator(name);
+    }
+}
diff --git a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEvent.java 
b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEvent.java
new file mode 100644
index 000000000..dc643ed0e
--- /dev/null
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEvent.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+public class SSEvent {
+
+    private String data;
+    private String event;
+    private String id;
+    private String comment;
+    private Long retry;
+
+    public SSEvent() {
+    }
+
+    public SSEvent(String data, String event, String id, String comment, Long 
retry) {
+        this.data = data;
+        this.event = event;
+        this.id = id;
+        this.comment = comment;
+        this.retry = retry;
+    }
+
+    public void setData(String data) {
+        this.data = data;
+    }
+
+    public void setEvent(String event) {
+        this.event = event;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public void setComment(String comment) {
+        this.comment = comment;
+    }
+
+    public void setRetry(Long retry) {
+        this.retry = retry;
+    }
+
+    public String getData() {
+        return this.data;
+    }
+
+    public String getEvent() {
+        return this.event;
+    }
+
+    public String getId() {
+        return this.id;
+    }
+
+    public String getComment() {
+        return this.comment;
+    }
+
+    public Long getRetry() {
+        return this.retry;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder eventString = new StringBuilder();
+
+        this.appendField(eventString, this.id, "id:");
+        this.appendField(eventString, this.event, "event:");
+        this.appendField(eventString, this.data, "data:");
+        this.appendField(eventString, this.retry, "retry:");
+        this.appendField(eventString, this.comment, "");
+
+        return eventString.toString();
+    }
+
+    private void appendField(StringBuilder eventString, Object field, String 
prefix) {
+        if (field != null) {
+            if (eventString.length() != 0) {
+                eventString.append('\n');
+            }
+            eventString.append(prefix);
+            eventString.append(field);
+        }
+    }
+}
diff --git 
a/gateway-spi/src/test/java/org/apache/knox/gateway/dispatch/DefaultHttpAsyncClientFactoryTest.java
 
b/gateway-spi/src/test/java/org/apache/knox/gateway/dispatch/DefaultHttpAsyncClientFactoryTest.java
new file mode 100644
index 000000000..001f830c2
--- /dev/null
+++ 
b/gateway-spi/src/test/java/org/apache/knox/gateway/dispatch/DefaultHttpAsyncClientFactoryTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.knox.gateway.dispatch;
+
+import org.apache.http.client.config.CookieSpecs;
+import org.apache.http.nio.client.HttpAsyncClient;
+import org.apache.knox.gateway.config.GatewayConfig;
+import org.apache.knox.gateway.services.GatewayServices;
+import org.apache.knox.gateway.services.ServiceType;
+import org.apache.knox.gateway.services.security.KeystoreService;
+import org.junit.Test;
+
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletContext;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertNotNull;
+
+public class DefaultHttpAsyncClientFactoryTest {
+
+    @Test
+    public void testCreateHttpAsyncClientSSLContextDefaults() throws Exception 
{
+        KeystoreService keystoreService = createMock(KeystoreService.class);
+        
expect(keystoreService.getTruststoreForHttpClient()).andReturn(null).once();
+
+        GatewayConfig gatewayConfig = createMock(GatewayConfig.class);
+        expect(gatewayConfig.isMetricsEnabled()).andReturn(false).once();
+        
expect(gatewayConfig.getHttpClientMaxConnections()).andReturn(32).once();
+        
expect(gatewayConfig.getHttpClientConnectionTimeout()).andReturn(20000).once();
+        
expect(gatewayConfig.getHttpClientSocketTimeout()).andReturn(20000).once();
+        
expect(gatewayConfig.getHttpClientCookieSpec()).andReturn(CookieSpecs.STANDARD).anyTimes();
+
+        GatewayServices gatewayServices = createMock(GatewayServices.class);
+        
expect(gatewayServices.getService(ServiceType.KEYSTORE_SERVICE)).andReturn(keystoreService).once();
+
+        ServletContext servletContext = createMock(ServletContext.class);
+        
expect(servletContext.getAttribute(GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE)).andReturn(gatewayConfig).atLeastOnce();
+        
expect(servletContext.getAttribute(GatewayServices.GATEWAY_SERVICES_ATTRIBUTE)).andReturn(gatewayServices).atLeastOnce();
+
+        FilterConfig filterConfig = createMock(FilterConfig.class);
+        
expect(filterConfig.getServletContext()).andReturn(servletContext).atLeastOnce();
+        
expect(filterConfig.getInitParameter("useTwoWaySsl")).andReturn("false").once();
+        
expect(filterConfig.getInitParameter("httpclient.maxConnections")).andReturn(null).once();
+        
expect(filterConfig.getInitParameter("httpclient.connectionTimeout")).andReturn(null).once();
+        
expect(filterConfig.getInitParameter("httpclient.socketTimeout")).andReturn(null).once();
+        
expect(filterConfig.getInitParameter("serviceRole")).andReturn(null).once();
+        
expect(filterConfig.getInitParameter("httpclient.cookieSpec")).andReturn(null).anyTimes();
+
+        replay(keystoreService, gatewayConfig, gatewayServices, 
servletContext, filterConfig);
+
+        DefaultHttpAsyncClientFactory factory = new 
DefaultHttpAsyncClientFactory();
+        HttpAsyncClient client = factory.createAsyncHttpClient(filterConfig);
+        assertNotNull(client);
+
+        verify(keystoreService, gatewayConfig, gatewayServices, 
servletContext, filterConfig);
+    }
+}
diff --git 
a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java 
b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java
new file mode 100644
index 000000000..66abcb791
--- /dev/null
+++ b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.apache.http.HttpStatus;
+import org.apache.http.client.config.CookieSpecs;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.knox.gateway.config.GatewayConfig;
+import org.apache.knox.gateway.services.GatewayServices;
+import org.apache.knox.gateway.services.ServiceType;
+import org.apache.knox.gateway.services.security.KeystoreService;
+import org.apache.knox.test.mock.MockServer;
+import org.apache.knox.test.mock.MockServletContext;
+import org.apache.knox.test.mock.MockServletInputStream;
+import org.easymock.EasyMock;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class SSEDispatchTest {
+
+    private static MockServer MOCK_SSE_SERVER;
+    private static URI URL;
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        MOCK_SSE_SERVER = new MockServer("SSE", true);
+        URL = new URI("http://localhost:"; + MOCK_SSE_SERVER.getPort() + 
"/sse");
+    }
+
+    @Test
+    public void testCreateAndDestroyClient() throws Exception {
+        SSEDispatch sseDispatch = this.createDispatch();
+        assertNotNull(sseDispatch.getAsyncClient());
+
+        sseDispatch.destroy();
+        assertFalse(((CloseableHttpAsyncClient) 
sseDispatch.getAsyncClient()).isRunning());
+    }
+
+    @Test
+    public void testGet2xx() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        SSEDispatch sseDispatch = this.createDispatch();
+        PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+        HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_OK);
+        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
+        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
+
+        this.expectResponseBodyAndHeader(printWriter, outboundResponse);
+        replay(inboundRequest, asyncContext, outboundResponse, printWriter);
+
+        MOCK_SSE_SERVER.expect()
+                .method("GET")
+                .pathInfo("/sse")
+                .header("request", "header")
+                .header("Accept", "text/event-stream")
+                .respond()
+                .status(HttpStatus.SC_OK)
+                
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
 StandardCharsets.UTF_8)
+                .header("response", "header")
+                .contentType("text/event-stream");
+
+        sseDispatch.doGet(URL, inboundRequest, outboundResponse);
+
+        latch.await(1L, TimeUnit.SECONDS);
+        EasyMock.verify(asyncContext, outboundResponse, inboundRequest, 
printWriter);
+        assertTrue(MOCK_SSE_SERVER.isEmpty());
+    }
+
+    @Test
+    public void testGet4xx() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        SSEDispatch sseDispatch = this.createDispatch();
+        HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_BAD_REQUEST);
+        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
+        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
+
+        replay(inboundRequest, asyncContext, outboundResponse);
+
+        MOCK_SSE_SERVER.expect()
+                .method("GET")
+                .pathInfo("/sse")
+                .respond()
+                .status(HttpStatus.SC_BAD_REQUEST);
+
+        sseDispatch.doGet(URL, inboundRequest, outboundResponse);
+
+        latch.await(1L, TimeUnit.SECONDS);
+        EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+        assertTrue(MOCK_SSE_SERVER.isEmpty());
+    }
+
+    @Test
+    public void testGet5xx() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        SSEDispatch sseDispatch = this.createDispatch();
+        HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
+        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
+
+        replay(inboundRequest, asyncContext, outboundResponse);
+
+        MOCK_SSE_SERVER.expect()
+                .method("GET")
+                .pathInfo("/sse")
+                .respond()
+                .status(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+
+        sseDispatch.doGet(URL, inboundRequest, outboundResponse);
+
+        latch.await(1L, TimeUnit.SECONDS);
+        EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+        assertTrue(MOCK_SSE_SERVER.isEmpty());
+    }
+
+    @Test
+    public void testPost2xx() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        SSEDispatch sseDispatch = this.createDispatch();
+        PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+        HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_OK);
+        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
+        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
+
+        this.expectResponseBodyAndHeader(printWriter, outboundResponse);
+        replay(inboundRequest, asyncContext, outboundResponse, printWriter);
+
+        MOCK_SSE_SERVER.expect()
+                .method("POST")
+                .pathInfo("/sse")
+                .header("request", "header")
+                .header("Accept", "text/event-stream")
+                .content("{\"request\":\"body\"}", StandardCharsets.UTF_8)
+                .respond()
+                .status(HttpStatus.SC_OK)
+                
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
 StandardCharsets.UTF_8)
+                .header("response", "header")
+                .contentType("text/event-stream");
+
+        sseDispatch.doPost(URL, inboundRequest, outboundResponse);
+
+        latch.await(1L, TimeUnit.SECONDS);
+        EasyMock.verify(asyncContext, outboundResponse, inboundRequest, 
printWriter);
+        assertTrue(MOCK_SSE_SERVER.isEmpty());
+    }
+
+    @Test
+    public void testPost4xx() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        SSEDispatch sseDispatch = this.createDispatch();
+        HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_NOT_FOUND);
+        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
+        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
+
+        replay(inboundRequest, asyncContext, outboundResponse);
+
+        MOCK_SSE_SERVER.expect()
+                .method("POST")
+                .pathInfo("/sse")
+                .respond()
+                .status(HttpStatus.SC_NOT_FOUND);
+
+        sseDispatch.doPost(URL, inboundRequest, outboundResponse);
+
+        latch.await(1L, TimeUnit.SECONDS);
+        EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+        assertTrue(MOCK_SSE_SERVER.isEmpty());
+    }
+
+    @Test
+    public void testPost5xx() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        SSEDispatch sseDispatch = this.createDispatch();
+        HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
+        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
+
+        replay(inboundRequest, asyncContext, outboundResponse);
+
+        MOCK_SSE_SERVER.expect()
+                .method("POST")
+                .pathInfo("/sse")
+                .respond()
+                .status(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+
+        sseDispatch.doPost(URL, inboundRequest, outboundResponse);
+
+        latch.await(1L, TimeUnit.SECONDS);
+        EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+        assertTrue(MOCK_SSE_SERVER.isEmpty());
+    }
+
+    @Test
+    public void testPut2xx() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        SSEDispatch sseDispatch = this.createDispatch();
+        PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+        HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_OK);
+        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
+        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
+
+        this.expectResponseBodyAndHeader(printWriter, outboundResponse);
+        replay(inboundRequest, asyncContext, outboundResponse, printWriter);
+
+        MOCK_SSE_SERVER.expect()
+                .method("PUT")
+                .pathInfo("/sse")
+                .header("request", "header")
+                .header("Accept", "text/event-stream")
+                .content("{\"request\":\"body\"}", StandardCharsets.UTF_8)
+                .respond()
+                .status(HttpStatus.SC_OK)
+                
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
 StandardCharsets.UTF_8)
+                .header("response", "header")
+                .contentType("text/event-stream");
+
+        sseDispatch.doPut(URL, inboundRequest, outboundResponse);
+
+        latch.await(1L, TimeUnit.SECONDS);
+        EasyMock.verify(asyncContext, outboundResponse, inboundRequest, 
printWriter);
+        assertTrue(MOCK_SSE_SERVER.isEmpty());
+    }
+
+    @Test
+    public void testPut4xx() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        SSEDispatch sseDispatch = this.createDispatch();
+        HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_NOT_FOUND);
+        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
+        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
+
+        replay(inboundRequest, asyncContext, outboundResponse);
+
+        MOCK_SSE_SERVER.expect()
+                .method("PUT")
+                .pathInfo("/sse")
+                .respond()
+                .status(HttpStatus.SC_NOT_FOUND);
+
+        sseDispatch.doPut(URL, inboundRequest, outboundResponse);
+
+        latch.await(1L, TimeUnit.SECONDS);
+        EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+        assertTrue(MOCK_SSE_SERVER.isEmpty());
+    }
+
+    @Test
+    public void testPut5xx() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        SSEDispatch sseDispatch = this.createDispatch();
+        HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
+        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
+
+        replay(inboundRequest, asyncContext, outboundResponse);
+
+        MOCK_SSE_SERVER.expect()
+                .method("PUT")
+                .pathInfo("/sse")
+                .respond()
+                .status(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+
+        sseDispatch.doPut(URL, inboundRequest, outboundResponse);
+
+        latch.await(1L, TimeUnit.SECONDS);
+        EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+        assertTrue(MOCK_SSE_SERVER.isEmpty());
+    }
+
+    @Test
+    public void testPatch2xx() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        SSEDispatch sseDispatch = this.createDispatch();
+        PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+        HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_OK);
+        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
+        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
+
+        this.expectResponseBodyAndHeader(printWriter, outboundResponse);
+        replay(inboundRequest, asyncContext, outboundResponse, printWriter);
+
+        MOCK_SSE_SERVER.expect()
+                .method("PATCH")
+                .pathInfo("/sse")
+                .header("request", "header")
+                .header("Accept", "text/event-stream")
+                .content("{\"request\":\"body\"}", StandardCharsets.UTF_8)
+                .respond()
+                .status(HttpStatus.SC_OK)
+                
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
 StandardCharsets.UTF_8)
+                .header("response", "header")
+                .contentType("text/event-stream");
+
+        sseDispatch.doPatch(URL, inboundRequest, outboundResponse);
+
+        latch.await(1L, TimeUnit.SECONDS);
+        EasyMock.verify(asyncContext, outboundResponse, inboundRequest, 
printWriter);
+        assertTrue(MOCK_SSE_SERVER.isEmpty());
+    }
+
+    @Test
+    public void testPatch4xx() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        SSEDispatch sseDispatch = this.createDispatch();
+        HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_NOT_FOUND);
+        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
+        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
+
+        replay(inboundRequest, asyncContext, outboundResponse);
+
+        MOCK_SSE_SERVER.expect()
+                .method("PATCH")
+                .pathInfo("/sse")
+                .respond()
+                .status(HttpStatus.SC_NOT_FOUND);
+
+        sseDispatch.doPatch(URL, inboundRequest, outboundResponse);
+
+        latch.await(1L, TimeUnit.SECONDS);
+        EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+        assertTrue(MOCK_SSE_SERVER.isEmpty());
+    }
+
+    @Test
+    public void testPatch5xx() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        SSEDispatch sseDispatch = this.createDispatch();
+        HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
+        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
+
+        replay(inboundRequest, asyncContext, outboundResponse);
+
+        MOCK_SSE_SERVER.expect()
+                .method("PATCH")
+                .pathInfo("/sse")
+                .respond()
+                .status(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+
+        sseDispatch.doPatch(URL, inboundRequest, outboundResponse);
+
+        latch.await(1L, TimeUnit.SECONDS);
+        EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+        assertTrue(MOCK_SSE_SERVER.isEmpty());
+    }
+
+    @Test
+    public void testServerNotAvailable() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        SSEDispatch sseDispatch = this.createDispatch();
+        HttpServletResponse outboundResponse = 
EasyMock.createNiceMock(HttpServletResponse.class);
+        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
+        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
+
+        replay(inboundRequest, asyncContext, outboundResponse);
+
+        sseDispatch.doGet(new URI("http://localhost:11223/sse";), 
inboundRequest, outboundResponse);
+
+        latch.await(1L, TimeUnit.SECONDS);
+        EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+    }
+
+    private HttpServletRequest getHttpServletRequest(AsyncContext 
asyncContext) throws Exception {
+        Map<String, String> headers = new HashMap<>();
+        headers.put("request", "header");
+        InputStream stream = new 
ByteArrayInputStream("{\"request\":\"body\"}".getBytes(StandardCharsets.UTF_8));
+        MockServletInputStream mockServletInputStream = new 
MockServletInputStream(stream);
+        HttpServletRequest inboundRequest = 
EasyMock.createNiceMock(HttpServletRequest.class);
+
+        
EasyMock.expect(inboundRequest.getHeaderNames()).andReturn(Collections.enumeration(headers.keySet())).anyTimes();
+        
EasyMock.expect(inboundRequest.startAsync()).andReturn(asyncContext).once();
+        
EasyMock.expect(inboundRequest.getHeader("request")).andReturn("header").once();
+        
EasyMock.expect(inboundRequest.getContentType()).andReturn("application/json").anyTimes();
+        
EasyMock.expect(inboundRequest.getInputStream()).andReturn(mockServletInputStream).anyTimes();
+        
EasyMock.expect(inboundRequest.getContentLength()).andReturn(mockServletInputStream.available()).anyTimes();
+        EasyMock.expect(inboundRequest.getServletContext()).andReturn(new 
MockServletContext()).anyTimes();
+
+        return inboundRequest;
+    }
+
+    private AsyncContext getAsyncContext(CountDownLatch latch, 
HttpServletResponse outboundResponse) {
+        AsyncContext asyncContext = 
EasyMock.createNiceMock(AsyncContext.class);
+
+        
EasyMock.expect(asyncContext.getResponse()).andReturn(outboundResponse).anyTimes();
+        asyncContext.complete();
+        EasyMock.expectLastCall().andAnswer(() -> {
+            latch.countDown();
+            return null;
+        });
+
+        return asyncContext;
+    }
+
+    private HttpServletResponse getServletResponse(int statusCode) {
+        HttpServletResponse outboundResponse = 
EasyMock.createNiceMock(HttpServletResponse.class);
+
+        outboundResponse.setStatus(statusCode);
+        EasyMock.expectLastCall();
+
+        return outboundResponse;
+    }
+
+    private void expectResponseBodyAndHeader(PrintWriter printWriter, 
HttpServletResponse outboundResponse) throws Exception {
+        outboundResponse.addHeader("response", "header");
+        EasyMock.expectLastCall();
+        
EasyMock.expect(outboundResponse.getWriter()).andReturn(printWriter).anyTimes();
+        printWriter.write("id:1\nevent:event1\ndata:data1");
+        EasyMock.expectLastCall();
+        printWriter.write("id:2\nevent:event2\ndata:data2\nretry:1\n:testing");
+        EasyMock.expectLastCall();
+        printWriter.println('\n');
+        EasyMock.expectLastCall().times(2);
+    }
+
+    private SSEDispatch createDispatch() throws Exception {
+        KeystoreService keystoreService = createMock(KeystoreService.class);
+        
expect(keystoreService.getTruststoreForHttpClient()).andReturn(null).once();
+
+        GatewayConfig gatewayConfig = createMock(GatewayConfig.class);
+        expect(gatewayConfig.isMetricsEnabled()).andReturn(false).once();
+        
expect(gatewayConfig.getHttpClientMaxConnections()).andReturn(32).once();
+        
expect(gatewayConfig.getHttpClientConnectionTimeout()).andReturn(20000).once();
+        
expect(gatewayConfig.getHttpClientSocketTimeout()).andReturn(20000).once();
+        
expect(gatewayConfig.getHttpClientCookieSpec()).andReturn(CookieSpecs.STANDARD).anyTimes();
+
+        GatewayServices gatewayServices = createMock(GatewayServices.class);
+        
expect(gatewayServices.getService(ServiceType.KEYSTORE_SERVICE)).andReturn(keystoreService).once();
+
+        ServletContext servletContext = createMock(ServletContext.class);
+        
expect(servletContext.getAttribute(GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE)).andReturn(gatewayConfig).atLeastOnce();
+        
expect(servletContext.getAttribute(GatewayServices.GATEWAY_SERVICES_ATTRIBUTE)).andReturn(gatewayServices).atLeastOnce();
+
+        FilterConfig filterConfig = createMock(FilterConfig.class);
+        
expect(filterConfig.getServletContext()).andReturn(servletContext).atLeastOnce();
+        
expect(filterConfig.getInitParameter("useTwoWaySsl")).andReturn("false").once();
+        
expect(filterConfig.getInitParameter("httpclient.maxConnections")).andReturn(null).once();
+        
expect(filterConfig.getInitParameter("httpclient.connectionTimeout")).andReturn(null).once();
+        
expect(filterConfig.getInitParameter("httpclient.socketTimeout")).andReturn(null).once();
+        
expect(filterConfig.getInitParameter("serviceRole")).andReturn(null).once();
+        
expect(filterConfig.getInitParameter("httpclient.cookieSpec")).andReturn(null).anyTimes();
+
+        replay(keystoreService, gatewayConfig, gatewayServices, 
servletContext, filterConfig);
+
+        return new SSEDispatch(filterConfig);
+    }
+}
\ No newline at end of file
diff --git 
a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEEntityTest.java 
b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEEntityTest.java
new file mode 100644
index 000000000..1c0375d9a
--- /dev/null
+++ b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEEntityTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.apache.http.HttpEntity;
+import org.easymock.EasyMock;
+import org.junit.Test;
+
+import java.nio.CharBuffer;
+import java.util.concurrent.BlockingQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+
+public class SSEEntityTest {
+
+    private final HttpEntity entityMock = 
EasyMock.createNiceMock(HttpEntity.class);
+
+    @Test
+    public void testParseSingleEvent() {
+        SSEEntity sseEntity = new SSEEntity(entityMock);
+        BlockingQueue<SSEvent> eventQueue = sseEntity.getEventQueue();
+        String unprocessedEvent = "id: 1\nevent: event\ndata: 
data\nretry:1\n:testing\n\n";
+        CharBuffer cb = CharBuffer.wrap(unprocessedEvent);
+
+        sseEntity.readCharBuffer(cb);
+
+        assertFalse(eventQueue.isEmpty());
+
+        SSEvent actualSSEvent = eventQueue.peek();
+        assertEquals("1", actualSSEvent.getId());
+        assertEquals("event", actualSSEvent.getEvent());
+        assertEquals("data", actualSSEvent.getData());
+        assertEquals(1L, actualSSEvent.getRetry().longValue());
+        assertEquals(":testing", actualSSEvent.getComment());
+    }
+
+    @Test
+    public void testParseMultipleEvents() {
+        SSEEntity sseEntity = new SSEEntity(this.entityMock);
+        BlockingQueue<SSEvent> eventQueue = sseEntity.getEventQueue();
+        String unprocessedEvents = "id: 1\nevent: event\ndata: data\n\nid: 
2\nevent: event2\ndata: data2\n\nid: 3\nevent: event3\ndata: 
data3\nretry:1045\n:TEST\n\n";
+        CharBuffer cb = CharBuffer.wrap(unprocessedEvents);
+
+        sseEntity.readCharBuffer(cb);
+
+        assertEquals(3, eventQueue.size());
+
+        SSEvent actualSSEvent = eventQueue.poll();
+        assertEquals("1", actualSSEvent.getId());
+        assertEquals("event", actualSSEvent.getEvent());
+        assertEquals("data", actualSSEvent.getData());
+        assertNull(actualSSEvent.getRetry());
+        assertNull(actualSSEvent.getComment());
+
+        actualSSEvent = eventQueue.poll();
+        assertEquals("2", actualSSEvent.getId());
+        assertEquals("event2", actualSSEvent.getEvent());
+        assertEquals("data2", actualSSEvent.getData());
+        assertNull(actualSSEvent.getRetry());
+        assertNull(actualSSEvent.getComment());
+
+        actualSSEvent = eventQueue.poll();
+        assertEquals("3", actualSSEvent.getId());
+        assertEquals("event3", actualSSEvent.getEvent());
+        assertEquals("data3", actualSSEvent.getData());
+        assertEquals(1045L, actualSSEvent.getRetry().longValue());
+        assertEquals(":TEST", actualSSEvent.getComment());
+    }
+
+    @Test
+    public void testMissingNewLine() {
+        SSEEntity sseEntity = new SSEEntity(entityMock);
+        BlockingQueue<SSEvent> eventQueue = sseEntity.getEventQueue();
+        String unprocessedEvent = "id: 1\nevent: event\ndata: 
data\nretry:1045\n:TEST\n";
+        CharBuffer cb = CharBuffer.wrap(unprocessedEvent);
+
+        sseEntity.readCharBuffer(cb);
+
+        assertTrue(eventQueue.isEmpty());
+    }
+
+    @Test
+    public void testParseEventWithSpecialChars() {
+        SSEEntity sseEntity = new SSEEntity(entityMock);
+        BlockingQueue<SSEvent> eventQueue = sseEntity.getEventQueue();
+        String unprocessedEvent = "id: 
75a0a510-0065-498f:be39-c6f42a3fe4af\ndata: 
data:{\"records\":[{\"col_str\":\"0e01eeef73f6833a98e1df6a5a00ea46f5b52dbee27ee89ebce894aaa555c90130b08fae8aaf600ef845b774ab0082fcaf8c\",\"col_int\":-580163093,\"col_ts\":\"2024-08-14T07:41:15.125\"}],\"job_status\":\"RUNNING\",\"end_of_samples\":false}\nretry:33\n::::test\n\n";
+        CharBuffer cb = CharBuffer.wrap(unprocessedEvent);
+
+        sseEntity.readCharBuffer(cb);
+
+        assertFalse(eventQueue.isEmpty());
+
+        SSEvent actualSSEvent = eventQueue.peek();
+        assertEquals("75a0a510-0065-498f:be39-c6f42a3fe4af", 
actualSSEvent.getId());
+        assertNull(actualSSEvent.getEvent());
+        
assertEquals("data:{\"records\":[{\"col_str\":\"0e01eeef73f6833a98e1df6a5a00ea46f5b52dbee27ee89ebce894aaa555c90130b08fae8aaf600ef845b774ab0082fcaf8c\",\"col_int\":-580163093,\"col_ts\":\"2024-08-14T07:41:15.125\"}],\"job_status\":\"RUNNING\",\"end_of_samples\":false}",
 actualSSEvent.getData());
+        assertEquals(33L, actualSSEvent.getRetry().longValue());
+        assertEquals("::::test", actualSSEvent.getComment());
+    }
+
+    @Test
+    public void testInvalidFormat() {
+        SSEEntity sseEntity = new SSEEntity(entityMock);
+        BlockingQueue<SSEvent> eventQueue = sseEntity.getEventQueue();
+        String unprocessedEvent = "id: 1\nevent: event\ndata: 
data\nretry:1045\n:TEST\nid: 1\nevent: event\ndata: data\nretry:1045\n:TEST\n";
+        CharBuffer cb = CharBuffer.wrap(unprocessedEvent);
+
+        sseEntity.readCharBuffer(cb);
+
+        assertTrue(eventQueue.isEmpty());
+    }
+
+    @Test
+    public void testParseEventsWithDifferentNewLineChars() {
+        SSEEntity sseEntity = new SSEEntity(entityMock);
+        BlockingQueue<SSEvent> eventQueue = sseEntity.getEventQueue();
+        String unprocessedEvents = "id: 1\nevent: event\ndata: data\r\nid: 
2\nevent: event2\ndata: data2\u2028\nid: 3\nevent: event3\ndata: 
data3\u2029\nid: 4\nevent: event4\ndata: data4\u0085\n";
+        CharBuffer cb = CharBuffer.wrap(unprocessedEvents);
+
+        sseEntity.readCharBuffer(cb);
+
+        assertEquals(4, eventQueue.size());
+
+        SSEvent actualSSEvent = eventQueue.poll();
+        assertEquals("1", actualSSEvent.getId());
+        assertEquals("event", actualSSEvent.getEvent());
+        assertEquals("data", actualSSEvent.getData());
+
+        actualSSEvent = eventQueue.poll();
+        assertEquals("2", actualSSEvent.getId());
+        assertEquals("event2", actualSSEvent.getEvent());
+        assertEquals("data2", actualSSEvent.getData());
+
+        actualSSEvent = eventQueue.poll();
+        assertEquals("3", actualSSEvent.getId());
+        assertEquals("event3", actualSSEvent.getEvent());
+        assertEquals("data3", actualSSEvent.getData());
+
+        actualSSEvent = eventQueue.poll();
+        assertEquals("4", actualSSEvent.getId());
+        assertEquals("event4", actualSSEvent.getEvent());
+        assertEquals("data4", actualSSEvent.getData());
+    }
+}
diff --git 
a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEventTest.java 
b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEventTest.java
new file mode 100644
index 000000000..378726ab3
--- /dev/null
+++ b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEventTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class SSEventTest {
+
+
+    @Test
+    public void testToStringWithAll() {
+        SSEvent ssEvent = new SSEvent("data", "event", "id", ":comment", 5L);
+        String expected = "id:id\nevent:event\ndata:data\nretry:5\n:comment";
+
+        assertEquals(expected, ssEvent.toString());
+    }
+
+    @Test
+    public void testToStringNoId() {
+        SSEvent ssEventNull = new SSEvent("data", "event", null, ":test", 2L);
+        SSEvent ssEventEmpty = new SSEvent("data", "event", "", ":new 
comment", 1L);
+        String expectedForNull = "event:event\ndata:data\nretry:2\n:test";
+        String expectedForEmpty = "id:\nevent:event\ndata:data\nretry:1\n:new 
comment";
+
+        assertEquals(expectedForNull, ssEventNull.toString());
+        assertEquals(expectedForEmpty, ssEventEmpty.toString());
+    }
+
+    @Test
+    public void testToStringNoEvent() {
+        SSEvent ssEventNull = new SSEvent("data", null, "id", ":comment", 11L);
+        SSEvent ssEventEmpty = new SSEvent("data", "", "id", ":test comment", 
30L);
+        String expectedForNull = "id:id\ndata:data\nretry:11\n:comment";
+        String expectedForEmpty = "id:id\nevent:\ndata:data\nretry:30\n:test 
comment";
+
+        assertEquals(expectedForNull, ssEventNull.toString());
+        assertEquals(expectedForEmpty, ssEventEmpty.toString());
+    }
+
+    @Test
+    public void testToStringNoData() {
+        SSEvent ssEventNull = new SSEvent(null, "event", "id", ":comment", 2L);
+        SSEvent ssEventEmpty = new SSEvent("", "event", "id", ":testing", 1L);
+        String expectedForNull = "id:id\nevent:event\nretry:2\n:comment";
+        String expectedForEmpty = 
"id:id\nevent:event\ndata:\nretry:1\n:testing";
+
+        assertEquals(expectedForNull, ssEventNull.toString());
+        assertEquals(expectedForEmpty, ssEventEmpty.toString());
+    }
+
+    @Test
+    public void testToStringNoComment() {
+        SSEvent ssEventNull = new SSEvent("data", "event", "id", null, 3L);
+        String expected = "id:id\nevent:event\ndata:data\nretry:3";
+
+        assertEquals(expected, ssEventNull.toString());
+    }
+
+    @Test
+    public void testToStringNoRetry() {
+        SSEvent ssEventNull = new SSEvent("data", "event", "id", ":TEST", 
null);
+        String expected = "id:id\nevent:event\ndata:data\n:TEST";
+
+        assertEquals(expected, ssEventNull.toString());
+    }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 084b6098b..75bd5155f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -167,6 +167,7 @@
         <ant-nodeps.version>1.8.1</ant-nodeps.version>
         <asm.version>9.0</asm.version>
         <aspectj.version>1.9.6</aspectj.version>
+        <asynchttpclient.version>4.1.5</asynchttpclient.version>
         <bcprov-jdk15on.version>1.67</bcprov-jdk15on.version>
         <ben-manes.caffeine.version>2.8.8</ben-manes.caffeine.version>
         
<buildnumber-maven-plugin.version>1.4</buildnumber-maven-plugin.version>
@@ -1549,6 +1550,16 @@
                 <artifactId>httpcore</artifactId>
                 <version>${httpcore.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.httpcomponents</groupId>
+                <artifactId>httpasyncclient</artifactId>
+                <version>${asynchttpclient.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.httpcomponents</groupId>
+                <artifactId>httpcore-nio</artifactId>
+                <version>${httpcore.version}</version>
+            </dependency>
 
             <dependency>
                 <groupId>joda-time</groupId>

Reply via email to