Tuomas Kiviaho created CAMEL-9370:
-------------------------------------
Summary: Support for async/deferred content
Key: CAMEL-9370
URL: https://issues.apache.org/jira/browse/CAMEL-9370
Project: Camel
Issue Type: Improvement
Components: camel-jetty
Affects Versions: 2.16.1
Reporter: Tuomas Kiviaho
I'm receiving {{text/event-stream}} and to my surprise there was no support of
anykind for {{AsyncContentProvider}}.
Hence here are my dealings with the issue in a form of a patch. Some of the
bits would work out of the box as-is, but others (such as {{jettyBinding
instanceof Response.ResponseListener}}). I've rammed in just to get it working.
It would be nice if this type of streaming could be enabled without having to
declare {{jettyBinding}}. A new option for instance would be more practical
solution.
{code:title=org/apache/camel/component/jetty/DefaultJettyHttpBinding.java}
@@ -191,7 +191,8 @@
}
} else {
// just grab the raw content body
- return httpExchange.getBody();
+ byte[] body = httpExchange.getBody();
+ return body == null ? httpExchange.getResponseContentProvider() :
body;
}
}
{code}
{code:title=org/apache/camel/component/jetty/JettyContentExchange.java}
@@ -25,6 +25,7 @@
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.ContentProvider;
public interface JettyContentExchange {
@@ -44,6 +45,8 @@
void setRequestContent(String data, String charset) throws
UnsupportedEncodingException;
void setRequestContent(InputStream ins);
+
+ void setRequestContent(ContentProvider contentProvider);
void addRequestHeader(String key, String s);
@@ -63,6 +66,8 @@
int getResponseStatus();
byte[] getResponseContentBytes();
+
+ ContentProvider getResponseContentProvider();
Map<String, Collection<String>> getResponseHeaders();
{code}
{code:title=org/apache/camel/component/jetty/JettyHttpProducer.java}
@@ -32,7 +32,6 @@
import org.apache.camel.Message;
import org.apache.camel.http.common.HttpConstants;
import org.apache.camel.http.common.HttpHelper;
-import org.apache.camel.http.common.HttpMethods;
import org.apache.camel.impl.DefaultAsyncProducer;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.util.ExchangeHelper;
@@ -40,6 +39,7 @@
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.URISupport;
import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.util.component.LifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -160,6 +160,9 @@
// (for example application/x-www-form-urlencoded forms
being sent)
String charset = IOHelper.getCharsetName(exchange, false);
httpExchange.setRequestContent(data, charset);
+ } else if (body instanceof ContentProvider) {
+ ContentProvider contentProvider = (ContentProvider) body;
+ httpExchange.setRequestContent(contentProvider);
} else {
// then fallback to input stream
InputStream is =
exchange.getContext().getTypeConverter().mandatoryConvertTo(InputStream.class,
exchange, exchange.getIn().getBody());
{code}
{code:title=org/apache/camel/component/jetty9/JettyContentExchange9.java}
@@ -21,6 +21,7 @@
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
+import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
import java.util.TreeMap;
@@ -35,14 +36,18 @@
import org.apache.camel.component.jetty.JettyContentExchange;
import org.apache.camel.component.jetty.JettyHttpBinding;
import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.Synchronizable;
+import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.BytesContentProvider;
+import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.client.util.InputStreamContentProvider;
import org.eclipse.jetty.client.util.StringContentProvider;
import org.eclipse.jetty.http.HttpFields;
+import org.eclipse.jetty.util.Callback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,7 +65,7 @@
private final CountDownLatch done = new CountDownLatch(1);
private Request request;
private Response response;
- private byte[] responseContent;
+ private Object responseContent;
private String requestContentType;
@@ -183,16 +188,20 @@
}
public void setRequestContent(byte[] byteArray) {
- this.request.content(new BytesContentProvider(byteArray),
this.requestContentType);
+ this.setRequestContent(new BytesContentProvider(byteArray));
}
public void setRequestContent(String data, String charset) throws
UnsupportedEncodingException {
StringContentProvider cp = charset != null ? new
StringContentProvider(data, charset) : new StringContentProvider(data);
- this.request.content(cp, this.requestContentType);
+ this.setRequestContent(cp);
}
public void setRequestContent(InputStream ins) {
- this.request.content(new InputStreamContentProvider(ins),
this.requestContentType);
+ this.setRequestContent(new InputStreamContentProvider(ins));
+ }
+
+ public void setRequestContent(ContentProvider contentProvider) {
+ this.request.content(contentProvider, this.requestContentType);
}
public void addRequestHeader(String key, String s) {
@@ -213,7 +222,66 @@
}
};
- BufferingResponseListener responseListener = new
BufferingResponseListener() {
+ Response.CompleteListener responseListener = jettyBinding instanceof
Response.ResponseListener ? new Response.Listener.Adapter() {
+
+ @Override
+ public void onHeaders(Response response) {
+ LOG.trace("onResponseComplete");
+ done.countDown();
+ JettyContentExchange9.this.response = response;
+ JettyContentExchange9.this.responseContent = new
DeferredContentProvider() {
+
+ @Override
+ public long getLength()
+ {
+ return -1;
+ }
+
+ };
+ try {
+ jettyBinding.populateResponse(exchange,
JettyContentExchange9.this);
+ } catch (Exception e) {
+ exchange.setException(e);
+ } finally {
+ JettyContentExchange9.this.callback.done(false);
+ }
+ }
+
+ @Override
+ public void onContent(Response response, ByteBuffer content,
Callback callback) {
+ DeferredContentProvider deferredContentProvider =
(DeferredContentProvider) JettyContentExchange9.this.responseContent;
+ if (!deferredContentProvider.offer(content, callback)) {
+ Synchronizable synchronizable = (Synchronizable)
JettyContentExchange9.this.responseContent;
+ Object lock = synchronizable.getLock();
+ synchronized (lock) {
+ if (!deferredContentProvider.offer(content, callback))
{
+ try {
+ lock.wait();
+ } catch (InterruptedException e) {
+ callback.failed(e);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void onFailure(Response response, Throwable failure)
+ {
+ doTaskCompleted(failure);
+ }
+
+ @Override
+ public void onComplete(Result result) {
+ DeferredContentProvider contentProvider =
(DeferredContentProvider) JettyContentExchange9.this.responseContent;
+ if (result.isSucceeded()) {
+ contentProvider.close();
+ } else {
+ contentProvider.failed(result.getFailure());
+ }
+ }
+
+ } : new BufferingResponseListener() {
@Override
public void onComplete(Result result) {
@@ -232,7 +300,11 @@
}
public byte[] getResponseContentBytes() {
- return responseContent;
+ return (byte[]) responseContent;
+ }
+
+ public ContentProvider getResponseContentProvider() {
+ return (ContentProvider) responseContent;
}
private Map<String, Collection<String>> getFieldsAsMap(HttpFields fields) {
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)