Repository: camel Updated Branches: refs/heads/camel-2.15.x 0dd343e9a -> 0dfbbb76c refs/heads/camel-2.16.x c272e40bc -> 66b436fb7 refs/heads/master cd1e214aa -> 02434f00b
CAMEL-9665: Fixed ahc-ws consumer to connect to the websocket when starting. Thanks to Thomas Gunter for the patch/suggestion. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/02434f00 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/02434f00 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/02434f00 Branch: refs/heads/master Commit: 02434f00bf6b29872af867f1100ea69743a00620 Parents: cd1e214 Author: Claus Ibsen <[email protected]> Authored: Sun Mar 6 12:36:07 2016 +0100 Committer: Claus Ibsen <[email protected]> Committed: Sun Mar 6 12:36:07 2016 +0100 ---------------------------------------------------------------------- .../camel/component/ahc/ws/WsEndpoint.java | 38 ++++++++++++-------- .../camel/component/ahc/ws/WsProducer.java | 2 +- 2 files changed, 24 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/02434f00/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java index a7ec795..5187673 100644 --- a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java +++ b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java @@ -16,10 +16,8 @@ */ package org.apache.camel.component.ahc.ws; -import java.io.IOException; import java.util.HashSet; import java.util.Set; -import java.util.concurrent.ExecutionException; import com.ning.http.client.AsyncHttpClient; import com.ning.http.client.AsyncHttpClientConfig; @@ -47,7 +45,7 @@ public class WsEndpoint extends AhcEndpoint { private static final transient Logger LOG = LoggerFactory.getLogger(WsEndpoint.class); // for using websocket streaming/fragments - private static final boolean GRIZZLY_AVAILABLE = + private static final boolean GRIZZLY_AVAILABLE = probeClass("com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider"); private final Set<WsConsumer> consumers = new HashSet<WsConsumer>(); @@ -55,7 +53,7 @@ public class WsEndpoint extends AhcEndpoint { private WebSocket websocket; @UriParam private boolean useStreaming; - + public WsEndpoint(String endpointUri, WsComponent component) { super(endpointUri, component, null); } @@ -68,7 +66,7 @@ public class WsEndpoint extends AhcEndpoint { return false; } } - + @Override public WsComponent getComponent() { return (WsComponent) super.getComponent(); @@ -120,34 +118,44 @@ public class WsEndpoint extends AhcEndpoint { } else { client = new AsyncHttpClient(ahp, config); } - return client; + return client; } - public void connect() throws InterruptedException, ExecutionException, IOException { - websocket = getClient().prepareGet(getHttpUri().toASCIIString()).execute( + public void connect() throws Exception { + String uri = getHttpUri().toASCIIString(); + + LOG.debug("Connecting to {}", uri); + websocket = getClient().prepareGet(uri).execute( new WebSocketUpgradeHandler.Builder() .addWebSocketListener(new WsListener()).build()).get(); } - + @Override protected void doStop() throws Exception { if (websocket != null && websocket.isOpen()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Disconnecting from {}", getHttpUri().toASCIIString()); + } websocket.close(); websocket = null; } super.doStop(); } - void connect(WsConsumer wsConsumer) { + void connect(WsConsumer wsConsumer) throws Exception { consumers.add(wsConsumer); + + if (websocket == null || !websocket.isOpen()) { + connect(); + } } void disconnect(WsConsumer wsConsumer) { consumers.remove(wsConsumer); } - + class WsListener implements WebSocketTextListener, WebSocketByteListener { - + @Override public void onOpen(WebSocket websocket) { LOG.debug("websocket opened"); @@ -165,7 +173,7 @@ public class WsEndpoint extends AhcEndpoint { @Override public void onMessage(byte[] message) { - LOG.debug("received message --> {}", message); + LOG.debug("Received message --> {}", message); for (WsConsumer consumer : consumers) { consumer.sendMessage(message); } @@ -173,14 +181,14 @@ public class WsEndpoint extends AhcEndpoint { @Override public void onMessage(String message) { - LOG.debug("received message --> {}", message); + LOG.debug("Received message --> {}", message); for (WsConsumer consumer : consumers) { consumer.sendMessage(message); } } } - + protected AsyncHttpProvider getAsyncHttpProvider(AsyncHttpClientConfig config) { if (GRIZZLY_AVAILABLE) { return new GrizzlyAsyncHttpProvider(config); http://git-wip-us.apache.org/repos/asf/camel/blob/02434f00/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java index d6319ad..5935fc2 100644 --- a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java +++ b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java @@ -46,8 +46,8 @@ public class WsProducer extends DefaultProducer { public void process(Exchange exchange) throws Exception { Message in = exchange.getIn(); Object message = in.getBody(); - log.debug("Sending out {}", message); if (message != null) { + log.debug("Sending out {}", message); if (message instanceof String) { sendMessage(getWebSocket(), (String)message, getEndpoint().isUseStreaming()); } else if (message instanceof byte[]) {
