Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3.0 89b68c5a8 -> 6f0739e4d
MLHR-1835 #resolve The AsyncHttpClient and WebSocket were not being closed on reconnect. Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/6f0739e4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/6f0739e4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/6f0739e4 Branch: refs/heads/devel-3.0 Commit: 6f0739e4d5f8aa73d4440da7cf604cadd849268d Parents: 89b68c5 Author: Timothy Farkas <[email protected]> Authored: Wed Sep 2 17:11:53 2015 -0700 Committer: Timothy Farkas <[email protected]> Committed: Thu Sep 3 10:41:04 2015 -0700 ---------------------------------------------------------------------- .../java/com/datatorrent/lib/io/WebSocketInputOperator.java | 6 ++++++ 1 file changed, 6 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6f0739e4/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java index dabcacb..e38d874 100644 --- a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java @@ -153,6 +153,7 @@ public class WebSocketInputOperator<T> extends SimpleSinglePortInputOperator<T> try { sleep(1000); if (connectionClosed && !WebSocketInputOperator.this.shutdown) { + connection.close(); WebSocketInputOperator.this.activate(null); } } @@ -183,6 +184,11 @@ public class WebSocketInputOperator<T> extends SimpleSinglePortInputOperator<T> } })); + + if (client != null) { + client.closeAsynchronously(); + } + client = new AsyncHttpClient(config); connection = client.prepareGet(uri.toString()).execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(new WebSocketTextListener() {
