Repository: incubator-apex-core Updated Branches: refs/heads/devel-3 892355ca1 -> 9656248cf
APEX-275 #resolve #comment Added synchronized block so no two threads will try to reconnect to websocket server Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d71473da Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d71473da Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d71473da Branch: refs/heads/devel-3 Commit: d71473dafb5d501fcb5b71ecc799dc63e6ecfacc Parents: ca542e3 Author: David Yan <[email protected]> Authored: Thu Nov 19 14:36:44 2015 -0800 Committer: Timothy Farkas <[email protected]> Committed: Mon Nov 23 12:20:58 2015 -0800 ---------------------------------------------------------------------- .../stram/util/SharedPubSubWebSocketClient.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d71473da/engine/src/main/java/com/datatorrent/stram/util/SharedPubSubWebSocketClient.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/util/SharedPubSubWebSocketClient.java b/engine/src/main/java/com/datatorrent/stram/util/SharedPubSubWebSocketClient.java index 96d5a40..7516e46 100644 --- a/engine/src/main/java/com/datatorrent/stram/util/SharedPubSubWebSocketClient.java +++ b/engine/src/main/java/com/datatorrent/stram/util/SharedPubSubWebSocketClient.java @@ -102,16 +102,18 @@ public class SharedPubSubWebSocketClient extends PubSubWebSocketClient @Override public void publish(String topic, Object data) throws IOException { - if (!isConnectionOpen()) { - try { - long now = System.currentTimeMillis(); - if (lastConnectTryTime + minWaitConnectionRetry < now) { - lastConnectTryTime = now; - openConnectionAsync(); + synchronized (this) { + if (!isConnectionOpen()) { + try { + long now = System.currentTimeMillis(); + if (lastConnectTryTime + minWaitConnectionRetry < now) { + lastConnectTryTime = now; + openConnectionAsync(); + } + } catch (Exception ex) { + LOG.debug("Failed attempt to reconnect to websocket server", ex); } } - catch (Exception ex) { - } } super.publish(topic, data); }
