Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3.1 b81265299 -> f57674e7a
MLHR-1835 #resolve The AsyncHttpClient and WebSocket were not being closed on reconnect. Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/f57674e7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/f57674e7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/f57674e7 Branch: refs/heads/devel-3.1 Commit: f57674e7a68c232f07636fd6582103732333ec77 Parents: b812652 Author: Timothy Farkas <[email protected]> Authored: Wed Sep 2 17:11:53 2015 -0700 Committer: Timothy Farkas <[email protected]> Committed: Thu Sep 3 10:40:05 2015 -0700 ---------------------------------------------------------------------- .../java/com/datatorrent/lib/io/WebSocketInputOperator.java | 6 ++++++ 1 file changed, 6 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f57674e7/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java index 02b9ef2..94f8d97 100644 --- a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java @@ -154,6 +154,7 @@ public class WebSocketInputOperator<T> extends SimpleSinglePortInputOperator<T> try { sleep(1000); if (connectionClosed && !WebSocketInputOperator.this.shutdown) { + connection.close(); WebSocketInputOperator.this.activate(null); } } @@ -184,6 +185,11 @@ public class WebSocketInputOperator<T> extends SimpleSinglePortInputOperator<T> } })); + + if (client != null) { + client.closeAsynchronously(); + } + client = new AsyncHttpClient(config); connection = client.prepareGet(uri.toString()).execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(new WebSocketTextListener() {
