Repository: incubator-apex-core Updated Branches: refs/heads/release-3.2 076979ac7 -> 010ff2540
APEX-275 Added synchronized block so no two threads will try to reconnect to websocket server Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/32035787 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/32035787 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/32035787 Branch: refs/heads/release-3.2 Commit: 320357876653dc2b89529320809c033ec16698e7 Parents: 076979a Author: David Yan <[email protected]> Authored: Thu Nov 19 14:36:44 2015 -0800 Committer: David Yan <[email protected]> Committed: Mon Nov 23 10:48:50 2015 -0800 ---------------------------------------------------------------------- .../stram/util/SharedPubSubWebSocketClient.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/32035787/engine/src/main/java/com/datatorrent/stram/util/SharedPubSubWebSocketClient.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/util/SharedPubSubWebSocketClient.java b/engine/src/main/java/com/datatorrent/stram/util/SharedPubSubWebSocketClient.java index 96d5a40..7516e46 100644 --- a/engine/src/main/java/com/datatorrent/stram/util/SharedPubSubWebSocketClient.java +++ b/engine/src/main/java/com/datatorrent/stram/util/SharedPubSubWebSocketClient.java @@ -102,16 +102,18 @@ public class SharedPubSubWebSocketClient extends PubSubWebSocketClient @Override public void publish(String topic, Object data) throws IOException { - if (!isConnectionOpen()) { - try { - long now = System.currentTimeMillis(); - if (lastConnectTryTime + minWaitConnectionRetry < now) { - lastConnectTryTime = now; - openConnectionAsync(); + synchronized (this) { + if (!isConnectionOpen()) { + try { + long now = System.currentTimeMillis(); + if (lastConnectTryTime + minWaitConnectionRetry < now) { + lastConnectTryTime = now; + openConnectionAsync(); + } + } catch (Exception ex) { + LOG.debug("Failed attempt to reconnect to websocket server", ex); } } - catch (Exception ex) { - } } super.publish(topic, data); }
