Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 a9bcce182 -> ef39a1b73
MLHR-1835 #resolve The AsyncHttpClient and WebSocket were not being closed on reconnect. Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/ef39a1b7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/ef39a1b7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/ef39a1b7 Branch: refs/heads/devel-3 Commit: ef39a1b73af4bb09024dc5286d891a68e95305ff Parents: a9bcce1 Author: Timothy Farkas <[email protected]> Authored: Wed Sep 2 17:11:53 2015 -0700 Committer: Timothy Farkas <[email protected]> Committed: Thu Sep 3 10:10:16 2015 -0700 ---------------------------------------------------------------------- .../java/com/datatorrent/lib/io/WebSocketInputOperator.java | 6 ++++++ 1 file changed, 6 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ef39a1b7/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java index 02b9ef2..94f8d97 100644 --- a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java @@ -154,6 +154,7 @@ public class WebSocketInputOperator<T> extends SimpleSinglePortInputOperator<T> try { sleep(1000); if (connectionClosed && !WebSocketInputOperator.this.shutdown) { + connection.close(); WebSocketInputOperator.this.activate(null); } } @@ -184,6 +185,11 @@ public class WebSocketInputOperator<T> extends SimpleSinglePortInputOperator<T> } })); + + if (client != null) { + client.closeAsynchronously(); + } + client = new AsyncHttpClient(config); connection = client.prepareGet(uri.toString()).execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(new WebSocketTextListener() {
