Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3.1 b81265299 -> f57674e7a


MLHR-1835 #resolve The AsyncHttpClient and WebSocket were not being closed on 
reconnect.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/f57674e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/f57674e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/f57674e7

Branch: refs/heads/devel-3.1
Commit: f57674e7a68c232f07636fd6582103732333ec77
Parents: b812652
Author: Timothy Farkas <[email protected]>
Authored: Wed Sep 2 17:11:53 2015 -0700
Committer: Timothy Farkas <[email protected]>
Committed: Thu Sep 3 10:40:05 2015 -0700

----------------------------------------------------------------------
 .../java/com/datatorrent/lib/io/WebSocketInputOperator.java    | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f57674e7/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java 
b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
index 02b9ef2..94f8d97 100644
--- a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
@@ -154,6 +154,7 @@ public class WebSocketInputOperator<T> extends 
SimpleSinglePortInputOperator<T>
         try {
           sleep(1000);
           if (connectionClosed && !WebSocketInputOperator.this.shutdown) {
+            connection.close();
             WebSocketInputOperator.this.activate(null);
           }
         }
@@ -184,6 +185,11 @@ public class WebSocketInputOperator<T> extends 
SimpleSinglePortInputOperator<T>
         }
 
       }));
+
+      if (client != null) {
+        client.closeAsynchronously();
+      }
+
       client = new AsyncHttpClient(config);
       connection = client.prepareGet(uri.toString()).execute(new 
WebSocketUpgradeHandler.Builder().addWebSocketListener(new 
WebSocketTextListener()
       {

Reply via email to