This is an automated email from the ASF dual-hosted git repository.
nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 5c09b65 Retry tunnel setup when it fails (#3299)
5c09b65 is described below
commit 5c09b65712148bf1a42c35bd22930ba2ba5ac3c1
Author: Ning Wang <[email protected]>
AuthorDate: Tue Jun 25 16:33:40 2019 -0700
Retry tunnel setup when it fails (#3299)
* Retry tunnel setup when it fails
* add delay between retries
* fix bug
---
.../zookeeper/curator/CuratorStateManager.java | 34 +++++++++++++++-------
1 file changed, 24 insertions(+), 10 deletions(-)
diff --git
a/heron/statemgrs/src/java/org/apache/heron/statemgr/zookeeper/curator/CuratorStateManager.java
b/heron/statemgrs/src/java/org/apache/heron/statemgr/zookeeper/curator/CuratorStateManager.java
index fe097e5..78129cc 100644
---
a/heron/statemgrs/src/java/org/apache/heron/statemgr/zookeeper/curator/CuratorStateManager.java
+++
b/heron/statemgrs/src/java/org/apache/heron/statemgr/zookeeper/curator/CuratorStateManager.java
@@ -63,6 +63,8 @@ import org.apache.zookeeper.Watcher;
public class CuratorStateManager extends FileSystemStateManager {
private static final Logger LOG =
Logger.getLogger(CuratorStateManager.class.getName());
+ private static final int TUNNEL_SETUP_RETRY = 0; // 0 means no retry
+ private static final int TUNNEL_SETUP_RETRY_SLEEP_SEC = 5;
private CuratorFramework client;
private String connectionString;
@@ -83,17 +85,29 @@ public class CuratorStateManager extends
FileSystemStateManager {
NetworkUtils.TunnelConfig.build(config,
NetworkUtils.HeronSystem.STATE_MANAGER);
if (tunnelConfig.isTunnelNeeded()) {
- Pair<String, List<Process>> tunneledResults =
setupZkTunnel(tunnelConfig);
-
- String newConnectionString = tunneledResults.first;
- if (newConnectionString.isEmpty()) {
- throw new IllegalArgumentException("Failed to connect to tunnel host '"
- + tunnelConfig.getTunnelHost() + "'");
+ for (int setupCount = 0;; ++setupCount) {
+ Pair<String, List<Process>> tunneledResults =
setupZkTunnel(tunnelConfig);
+ String newConnectionString = tunneledResults.first;
+
+ // If tunnel can't be setup correctly. Retry or bail.
+ if (!newConnectionString.isEmpty()) {
+ // Success, use the new connection string
+ connectionString = newConnectionString;
+ tunnelProcesses.addAll(tunneledResults.second);
+ break;
+ } else {
+ if (setupCount < TUNNEL_SETUP_RETRY) {
+ try {
+ TimeUnit.SECONDS.sleep(TUNNEL_SETUP_RETRY_SLEEP_SEC);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ } else {
+ throw new IllegalArgumentException("Failed to connect to tunnel
host '"
+ + tunnelConfig.getTunnelHost() + "'");
+ }
+ }
}
-
- // Use the new connection string
- connectionString = newConnectionString;
- tunnelProcesses.addAll(tunneledResults.second);
}
// Start it