[
https://issues.apache.org/jira/browse/KNOX-3065?focusedWorklogId=942744&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-942744
]
ASF GitHub Bot logged work on KNOX-3065:
----------------------------------------
Author: ASF GitHub Bot
Created on: 08/Nov/24 09:55
Start Date: 08/Nov/24 09:55
Worklog Time Spent: 10m
Work Description: hanicz commented on code in PR #947:
URL: https://github.com/apache/knox/pull/947#discussion_r1834046591
##########
gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.nio.IOControl;
+import org.apache.http.nio.client.HttpAsyncClient;
+import org.apache.http.nio.client.methods.AsyncCharConsumer;
+import org.apache.http.nio.client.methods.HttpAsyncMethods;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.http.protocol.HttpContext;
+import org.apache.knox.gateway.audit.api.Action;
+import org.apache.knox.gateway.audit.api.ActionOutcome;
+import org.apache.knox.gateway.audit.api.ResourceType;
+import org.apache.knox.gateway.dispatch.ConfigurableDispatch;
+import org.apache.knox.gateway.dispatch.DefaultHttpAsyncClientFactory;
+import org.apache.knox.gateway.dispatch.HttpAsyncClientFactory;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.FilterConfig;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.CharBuffer;
+import java.nio.charset.StandardCharsets;
+
+public class SSEDispatch extends ConfigurableDispatch {
+
+ private final HttpAsyncClient asyncClient;
+ private static final String TEXT_EVENT_STREAM_VALUE = "text/event-stream";
+
+ public SSEDispatch(FilterConfig filterConfig) {
+ HttpAsyncClientFactory asyncClientFactory = new
DefaultHttpAsyncClientFactory();
+ this.asyncClient =
asyncClientFactory.createAsyncHttpClient(filterConfig);
+
+ if (asyncClient instanceof CloseableHttpAsyncClient) {
+ ((CloseableHttpAsyncClient) this.asyncClient).start();
+ }
+ }
+
+ @Override
+ public void doGet(URI url, HttpServletRequest inboundRequest,
HttpServletResponse outboundResponse)
+ throws IOException {
+ final HttpGet httpGetRequest = new HttpGet(url);
+ this.commonBaseMethod(httpGetRequest, inboundRequest,
outboundResponse);
+ }
+
+ @Override
+ public void doPost(URI url, HttpServletRequest inboundRequest,
HttpServletResponse outboundResponse)
+ throws IOException, URISyntaxException {
+ final HttpPost httpPostRequest = new HttpPost(url);
+ httpPostRequest.setEntity(this.createRequestEntity(inboundRequest));
+ this.commonBaseMethod(httpPostRequest, inboundRequest,
outboundResponse);
+ }
+
+ @Override
+ public void doPut(URI url, HttpServletRequest inboundRequest,
HttpServletResponse outboundResponse)
+ throws IOException {
+ final HttpPut httpPutRequest = new HttpPut(url);
+ httpPutRequest.setEntity(this.createRequestEntity(inboundRequest));
+ this.commonBaseMethod(httpPutRequest, inboundRequest,
outboundResponse);
+ }
+
+ @Override
+ public void doPatch(URI url, HttpServletRequest inboundRequest,
HttpServletResponse outboundResponse)
+ throws IOException {
+ final HttpPatch httpPatchRequest = new HttpPatch(url);
+ httpPatchRequest.setEntity(this.createRequestEntity(inboundRequest));
+ this.commonBaseMethod(httpPatchRequest, inboundRequest,
outboundResponse);
+ }
+
+ private void commonBaseMethod(HttpUriRequest httpMethod,
HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) throws
IOException {
Review Comment:
Changed it.
##########
gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.nio.IOControl;
+import org.apache.http.nio.client.HttpAsyncClient;
+import org.apache.http.nio.client.methods.AsyncCharConsumer;
+import org.apache.http.nio.client.methods.HttpAsyncMethods;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.http.protocol.HttpContext;
+import org.apache.knox.gateway.audit.api.Action;
+import org.apache.knox.gateway.audit.api.ActionOutcome;
+import org.apache.knox.gateway.audit.api.ResourceType;
+import org.apache.knox.gateway.dispatch.ConfigurableDispatch;
+import org.apache.knox.gateway.dispatch.DefaultHttpAsyncClientFactory;
+import org.apache.knox.gateway.dispatch.HttpAsyncClientFactory;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.FilterConfig;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.CharBuffer;
+import java.nio.charset.StandardCharsets;
+
+public class SSEDispatch extends ConfigurableDispatch {
+
+ private final HttpAsyncClient asyncClient;
+ private static final String TEXT_EVENT_STREAM_VALUE = "text/event-stream";
+
+ public SSEDispatch(FilterConfig filterConfig) {
+ HttpAsyncClientFactory asyncClientFactory = new
DefaultHttpAsyncClientFactory();
+ this.asyncClient =
asyncClientFactory.createAsyncHttpClient(filterConfig);
+
+ if (asyncClient instanceof CloseableHttpAsyncClient) {
+ ((CloseableHttpAsyncClient) this.asyncClient).start();
+ }
+ }
+
+ @Override
+ public void doGet(URI url, HttpServletRequest inboundRequest,
HttpServletResponse outboundResponse)
+ throws IOException {
+ final HttpGet httpGetRequest = new HttpGet(url);
+ this.commonBaseMethod(httpGetRequest, inboundRequest,
outboundResponse);
+ }
+
+ @Override
+ public void doPost(URI url, HttpServletRequest inboundRequest,
HttpServletResponse outboundResponse)
+ throws IOException, URISyntaxException {
+ final HttpPost httpPostRequest = new HttpPost(url);
+ httpPostRequest.setEntity(this.createRequestEntity(inboundRequest));
+ this.commonBaseMethod(httpPostRequest, inboundRequest,
outboundResponse);
+ }
+
+ @Override
+ public void doPut(URI url, HttpServletRequest inboundRequest,
HttpServletResponse outboundResponse)
+ throws IOException {
+ final HttpPut httpPutRequest = new HttpPut(url);
+ httpPutRequest.setEntity(this.createRequestEntity(inboundRequest));
+ this.commonBaseMethod(httpPutRequest, inboundRequest,
outboundResponse);
+ }
+
+ @Override
+ public void doPatch(URI url, HttpServletRequest inboundRequest,
HttpServletResponse outboundResponse)
+ throws IOException {
+ final HttpPatch httpPatchRequest = new HttpPatch(url);
+ httpPatchRequest.setEntity(this.createRequestEntity(inboundRequest));
+ this.commonBaseMethod(httpPatchRequest, inboundRequest,
outboundResponse);
+ }
+
+ private void commonBaseMethod(HttpUriRequest httpMethod,
HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) throws
IOException {
+ this.addAcceptHeader(httpMethod);
+ this.copyRequestHeaderFields(httpMethod, inboundRequest);
+ this.executeRequest(httpMethod, outboundResponse, inboundRequest);
+ }
+
+ private void executeRequest(HttpUriRequest outboundRequest,
HttpServletResponse outboundResponse, HttpServletRequest inboundRequest) {
+ AsyncContext asyncContext = inboundRequest.startAsync();
+ //No timeout
+ asyncContext.setTimeout(0L);
+
+ HttpAsyncRequestProducer producer =
HttpAsyncMethods.create(outboundRequest);
+ AsyncCharConsumer<SSEResponse> consumer = new
SSECharConsumer(outboundResponse, outboundRequest.getURI(), asyncContext);
+ this.getSSEConnection(producer, consumer, outboundRequest);
+ }
+
+ private void getSSEConnection(HttpAsyncRequestProducer producer,
AsyncCharConsumer<SSEResponse> consumer, HttpUriRequest outboundRequest) {
Review Comment:
Changed it.
Issue Time Tracking
-------------------
Worklog Id: (was: 942744)
Time Spent: 1h 40m (was: 1.5h)
> Support Server Sent Events (SSE) in Knox
> ----------------------------------------
>
> Key: KNOX-3065
> URL: https://issues.apache.org/jira/browse/KNOX-3065
> Project: Apache Knox
> Issue Type: New Feature
> Components: Server
> Affects Versions: 2.1.0
> Reporter: Tamás Hanicz
> Priority: Major
> Time Spent: 1h 40m
> Remaining Estimate: 0h
>
> Currently Knox is not working with SSE. In case of SSE the response header
> contains Content-Type = text/event-stream, in which case the server will send
> messages terminated by \n\n. Knox should send these messages to the client as
> they arrive. Currently Knox collects them all, and once the server closes the
> connection sends all of them concatenated to each other to the client.
> Requirement:
> * Full support in Knox for Server Sent Events.
> Definition of done:
> * Clients can connect to SSE endpoints in the backend via Knox.
> * Server Sent Events
> ([https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events])
> are forwarded by Knox from the server to the client as soon as they are
> received.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)