This is an automated email from the ASF dual-hosted git repository.
siano pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 684ef57 CAMEL-12227 - upgrade to AHC 2.3.0
684ef57 is described below
commit 684ef5786838e051cece2f510d9dafabdc310d02
Author: Stephan Siano <[email protected]>
AuthorDate: Thu Feb 8 10:24:10 2018 +0100
CAMEL-12227 - upgrade to AHC 2.3.0
---
.../apache/camel/component/ahc/ws/WsEndpoint.java | 12 +++++-----
.../apache/camel/component/ahc/ws/WsProducer.java | 18 +++++++-------
.../org/apache/camel/component/ahc/AhcBinding.java | 5 ++--
.../apache/camel/component/ahc/AhcEndpoint.java | 11 ++++++---
.../apache/camel/component/ahc/AhcProducer.java | 6 ++---
.../camel/component/ahc/DefaultAhcBinding.java | 28 ++++++++++------------
.../component/atmosphere/websocket/TestClient.java | 17 +++++++------
parent/pom.xml | 2 +-
8 files changed, 51 insertions(+), 48 deletions(-)
diff --git
a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
index 29a593b..10da4ac 100644
---
a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
+++
b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
@@ -29,8 +29,8 @@ import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.AsyncHttpClientConfig;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
-import org.asynchttpclient.ws.DefaultWebSocketListener;
import org.asynchttpclient.ws.WebSocket;
+import org.asynchttpclient.ws.WebSocketListener;
import org.asynchttpclient.ws.WebSocketUpgradeHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -133,7 +133,7 @@ public class WsEndpoint extends AhcEndpoint {
LOG.debug("Disconnecting from {}",
getHttpUri().toASCIIString());
}
websocket.removeWebSocketListener(listener);
- websocket.close();
+ websocket.sendCloseFrame();
websocket = null;
}
super.doStop();
@@ -156,7 +156,7 @@ public class WsEndpoint extends AhcEndpoint {
}
}
- class WsListener extends DefaultWebSocketListener {
+ class WsListener implements WebSocketListener {
@Override
public void onOpen(WebSocket websocket) {
@@ -164,7 +164,7 @@ public class WsEndpoint extends AhcEndpoint {
}
@Override
- public void onClose(WebSocket websocket) {
+ public void onClose(WebSocket websocket, int code, String reason) {
LOG.debug("websocket closed - reconnecting");
try {
reConnect();
@@ -184,7 +184,7 @@ public class WsEndpoint extends AhcEndpoint {
}
@Override
- public void onMessage(byte[] message) {
+ public void onBinaryFrame(byte[] message, boolean finalFragment, int
rsv) {
LOG.debug("Received message --> {}", message);
for (WsConsumer consumer : consumers) {
consumer.sendMessage(message);
@@ -192,7 +192,7 @@ public class WsEndpoint extends AhcEndpoint {
}
@Override
- public void onMessage(String message) {
+ public void onTextFrame(String message, boolean finalFragment, int
rsv) {
LOG.debug("Received message --> {}", message);
for (WsConsumer consumer : consumers) {
consumer.sendMessage(message);
diff --git
a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java
b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java
index 5ba9759..347f8d6 100644
---
a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java
+++
b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java
@@ -55,7 +55,7 @@ public class WsProducer extends DefaultProducer {
sendStreamMessage(getWebSocket(), (InputStream)message);
} else {
//TODO provide other binding option, for now use the converted
string
- getWebSocket().sendMessage(in.getMandatoryBody(String.class));
+
getWebSocket().sendTextFrame(in.getMandatoryBody(String.class));
}
}
}
@@ -65,15 +65,15 @@ public class WsProducer extends DefaultProducer {
int p = 0;
while (p < msg.length()) {
if (msg.length() - p < streamBufferSize) {
- webSocket.stream(msg.substring(p), true);
+ webSocket.sendTextFrame(msg.substring(p), true, 0);
p = msg.length();
} else {
- webSocket.stream(msg.substring(p, streamBufferSize),
false);
+ webSocket.sendTextFrame(msg.substring(p,
streamBufferSize), false, 0);
p += streamBufferSize;
}
}
} else {
- webSocket.sendMessage(msg);
+ webSocket.sendTextFrame(msg);
}
}
@@ -89,20 +89,20 @@ public class WsProducer extends DefaultProducer {
System.arraycopy(msg, p, writebuf, 0, rest);
byte[] tmpbuf = new byte[rest];
System.arraycopy(writebuf, 0, tmpbuf, 0, rest);
- webSocket.stream(tmpbuf, 0, rest, true);
+ webSocket.sendBinaryFrame(tmpbuf, true, 0);
// ends
p = msg.length;
} else {
// bug in grizzly? we need to create a byte array with the
exact length
//webSocket.stream(msg, p, streamBufferSize, false);
System.arraycopy(msg, p, writebuf, 0, streamBufferSize);
- webSocket.stream(writebuf, 0, streamBufferSize, false);
+ webSocket.sendBinaryFrame(writebuf, false, 0);
// ends
p += streamBufferSize;
}
}
} else {
- webSocket.sendMessage(msg);
+ webSocket.sendBinaryFrame(msg);
}
}
@@ -114,7 +114,7 @@ public class WsProducer extends DefaultProducer {
try {
while ((rn = in.read(readbuf, 0, readbuf.length)) != -1) {
if (wn > 0) {
- webSocket.stream(writebuf, 0, writebuf.length, false);
+ webSocket.sendBinaryFrame(writebuf, false, 0);
}
System.arraycopy(readbuf, 0, writebuf, 0, rn);
wn = rn;
@@ -125,7 +125,7 @@ public class WsProducer extends DefaultProducer {
writebuf = new byte[wn];
System.arraycopy(tmpbuf, 0, writebuf, 0, wn);
} // ends
- webSocket.stream(writebuf, 0, wn, true);
+ webSocket.sendBinaryFrame(writebuf, true, 0);
} finally {
in.close();
}
diff --git
a/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcBinding.java
b/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcBinding.java
index 189d364..b6d940f 100644
---
a/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcBinding.java
+++
b/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcBinding.java
@@ -19,10 +19,11 @@ package org.apache.camel.component.ahc;
import java.io.ByteArrayOutputStream;
import org.apache.camel.Exchange;
-import org.asynchttpclient.HttpResponseHeaders;
import org.asynchttpclient.HttpResponseStatus;
import org.asynchttpclient.Request;
+import io.netty.handler.codec.http.HttpHeaders;
+
/**
* Binding from Camel to/from {@link com.ning.http.client.AsyncHttpClient}
*/
@@ -66,7 +67,7 @@ public interface AhcBinding {
* @param headers the HTTP headers
* @throws Exception is thrown if error occurred in the callback
*/
- void onHeadersReceived(AhcEndpoint endpoint, Exchange exchange,
HttpResponseHeaders headers) throws Exception;
+ void onHeadersReceived(AhcEndpoint endpoint, Exchange exchange,
HttpHeaders headers) throws Exception;
/**
* Callback from the {@link com.ning.http.client.AsyncHttpClient} when
complete and all the response has been received.
diff --git
a/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcEndpoint.java
b/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcEndpoint.java
index 54dc8a3..610be59 100644
---
a/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcEndpoint.java
+++
b/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcEndpoint.java
@@ -280,7 +280,7 @@ public class AhcEndpoint extends DefaultEndpoint implements
AsyncEndpoint, Heade
super.doStart();
if (client == null) {
- AsyncHttpClientConfig config = null;
+ AsyncHttpClientConfig config;
if (clientConfig != null) {
DefaultAsyncHttpClientConfig.Builder builder =
AhcComponent.cloneConfig(clientConfig);
@@ -293,13 +293,18 @@ public class AhcEndpoint extends DefaultEndpoint
implements AsyncEndpoint, Heade
config = builder.build();
} else {
+ DefaultAsyncHttpClientConfig.Builder builder = new
DefaultAsyncHttpClientConfig.Builder();
+ /*
+ * Not doing this will always create a cookie handler per
endpoint, which is incompatible
+ * to prior versions and interferes with the cookie handling
in camel
+ */
+ builder.setCookieStore(null);
if (sslContextParameters != null) {
- DefaultAsyncHttpClientConfig.Builder builder = new
DefaultAsyncHttpClientConfig.Builder();
SSLContext sslContext =
sslContextParameters.createSSLContext(getCamelContext());
JdkSslContext ssl = new JdkSslContext(sslContext, true,
ClientAuth.REQUIRE);
builder.setSslContext(ssl);
- config = builder.build();
}
+ config = builder.build();
}
client = createClient(config);
}
diff --git
a/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcProducer.java
b/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcProducer.java
index 887a458..dbec1a0 100644
---
a/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcProducer.java
+++
b/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcProducer.java
@@ -24,10 +24,11 @@ import org.apache.camel.impl.DefaultAsyncProducer;
import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.HttpResponseBodyPart;
-import org.asynchttpclient.HttpResponseHeaders;
import org.asynchttpclient.HttpResponseStatus;
import org.asynchttpclient.Request;
+import io.netty.handler.codec.http.HttpHeaders;
+
/**
*
*/
@@ -144,7 +145,7 @@ public class AhcProducer extends DefaultAsyncProducer {
}
@Override
- public State onHeadersReceived(HttpResponseHeaders headers) throws
Exception {
+ public State onHeadersReceived(HttpHeaders headers) throws Exception {
if (log.isTraceEnabled()) {
log.trace("{} onHeadersReceived {}", exchange.getExchangeId(),
headers);
}
@@ -156,5 +157,4 @@ public class AhcProducer extends DefaultAsyncProducer {
return State.CONTINUE;
}
}
-
}
diff --git
a/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/DefaultAhcBinding.java
b/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/DefaultAhcBinding.java
index 2e5c3d9..86466fa 100644
---
a/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/DefaultAhcBinding.java
+++
b/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/DefaultAhcBinding.java
@@ -26,10 +26,8 @@ import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.nio.charset.Charset;
import java.util.LinkedHashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.TreeMap;
import org.apache.camel.CamelExchangeException;
@@ -43,17 +41,17 @@ import org.apache.camel.util.GZIPHelper;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.MessageHelper;
import org.apache.camel.util.ObjectHelper;
-import org.asynchttpclient.HttpResponseHeaders;
import org.asynchttpclient.HttpResponseStatus;
import org.asynchttpclient.Request;
import org.asynchttpclient.RequestBuilder;
import org.asynchttpclient.request.body.generator.BodyGenerator;
-import org.asynchttpclient.request.body.generator.ByteArrayBodyGenerator;
import org.asynchttpclient.request.body.generator.FileBodyGenerator;
import org.asynchttpclient.request.body.generator.InputStreamBodyGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.netty.handler.codec.http.HttpHeaders;
+
public class DefaultAhcBinding implements AhcBinding {
protected final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -167,7 +165,7 @@ public class DefaultAhcBinding implements AhcBinding {
ByteArrayOutputStream bos = new
ByteArrayOutputStream(endpoint.getBufferSize());
AhcHelper.writeObjectToStream(bos, obj);
byte[] bytes = bos.toByteArray();
- body = new ByteArrayBodyGenerator(bytes);
+ body = new InputStreamBodyGenerator(new
ByteArrayInputStream(bytes));
IOHelper.close(bos);
} else if (data instanceof File || data instanceof
GenericFile) {
// file based (could potentially also be a FTP file
etc)
@@ -181,9 +179,9 @@ public class DefaultAhcBinding implements AhcBinding {
// do not fallback to use the default charset as it
can influence the request
// (for example application/x-www-form-urlencoded
forms being sent)
if (charset != null) {
- body = new ByteArrayBodyGenerator(((String)
data).getBytes(charset));
+ body = new InputStreamBodyGenerator(new
ByteArrayInputStream(((String) data).getBytes(charset)));
} else {
- body = new ByteArrayBodyGenerator(((String)
data).getBytes());
+ body = new InputStreamBodyGenerator(new
ByteArrayInputStream(((String) data).getBytes()));
}
}
// fallback as input stream
@@ -232,16 +230,16 @@ public class DefaultAhcBinding implements AhcBinding {
}
@Override
- public void onHeadersReceived(AhcEndpoint endpoint, Exchange exchange,
HttpResponseHeaders headers) throws Exception {
+ public void onHeadersReceived(AhcEndpoint endpoint, Exchange exchange,
HttpHeaders headers) throws Exception {
Map<String, List<String>> m = new TreeMap<String,
List<String>>(String.CASE_INSENSITIVE_ORDER);
- for (Entry<String, String> entry : headers.getHeaders().entries()) {
- String key = entry.getKey();
- String value = entry.getValue();
- if (!m.containsKey(key)) {
- m.put(key, new LinkedList<String>());
- exchange.getOut().getHeaders().put(key, value);
+ for (String name:headers.names()) {
+ List<String> values = headers.getAll(name);
+ if (values.size() == 1) {
+ exchange.getOut().getHeaders().put(name, values.get(0));
+ } else {
+ exchange.getOut().getHeaders().put(name, values);
}
- m.get(key).add(value);
+ m.put(name, values);
}
// handle cookies
if (endpoint.getCookieHandler() != null) {
diff --git
a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/TestClient.java
b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/TestClient.java
index e9ee2ce..ec75e7b 100644
---
a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/TestClient.java
+++
b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/TestClient.java
@@ -28,8 +28,7 @@ import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.AsyncHttpClientConfig;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.ws.WebSocket;
-import org.asynchttpclient.ws.WebSocketByteListener;
-import org.asynchttpclient.ws.WebSocketTextListener;
+import org.asynchttpclient.ws.WebSocketListener;
import org.asynchttpclient.ws.WebSocketUpgradeHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,11 +68,11 @@ public class TestClient {
}
public void sendTextMessage(String message) {
- websocket.sendMessage(message);
+ websocket.sendTextFrame(message);
}
public void sendBytesMessage(byte[] message) {
- websocket.sendMessage(message);
+ websocket.sendBinaryFrame(message);
}
public boolean await(int secs) throws InterruptedException {
@@ -116,11 +115,11 @@ public class TestClient {
}
public void close() throws IOException {
- websocket.close();
+ websocket.sendCloseFrame();
client.close();
}
- private class TestWebSocketListener implements WebSocketTextListener,
WebSocketByteListener {
+ private class TestWebSocketListener implements WebSocketListener {
@Override
public void onOpen(WebSocket websocket) {
@@ -128,7 +127,7 @@ public class TestClient {
}
@Override
- public void onClose(WebSocket websocket) {
+ public void onClose(WebSocket websocket, int code, String reason) {
LOG.info("[ws] closed");
}
@@ -138,7 +137,7 @@ public class TestClient {
}
@Override
- public void onMessage(byte[] message) {
+ public void onBinaryFrame(byte[] message, boolean finalFragment, int
rsv) {
received.add(message);
LOG.info("[ws] received bytes --> " + Arrays.toString(message));
latch.countDown();
@@ -146,7 +145,7 @@ public class TestClient {
@Override
- public void onMessage(String message) {
+ public void onTextFrame(String message, boolean finalFragment, int
rsv) {
received.add(message);
LOG.info("[ws] received --> " + message);
latch.countDown();
diff --git a/parent/pom.xml b/parent/pom.xml
index acd5543..32a7790 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -43,7 +43,7 @@
<activemq-version>5.15.3</activemq-version>
<activemq-artemis-version>2.4.0</activemq-artemis-version>
<aether-version>1.0.2.v20150114</aether-version>
- <ahc-version>2.0.38</ahc-version>
+ <ahc-version>2.3.0</ahc-version>
<ant-bundle-version>1.7.0_6</ant-bundle-version>
<antlr-bundle-version>3.5.2_1</antlr-bundle-version>
<antlr-runtime-bundle-version>3.5.2_1</antlr-runtime-bundle-version>
--
To stop receiving notification emails like this one, please contact
[email protected].