Repository: airavata Updated Branches: refs/heads/master 51f456d8c -> 0c981d6fc
adding reconnect code to ariavata-server Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/0c981d6f Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/0c981d6f Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/0c981d6f Branch: refs/heads/master Commit: 0c981d6fc97e748ff5746bd6e3d92c94228b0f1d Parents: 51f456d Author: lahiru <[email protected]> Authored: Thu Oct 16 15:37:33 2014 -0400 Committer: lahiru <[email protected]> Committed: Thu Oct 16 15:37:33 2014 -0400 ---------------------------------------------------------------------- .../server/handler/AiravataServerHandler.java | 134 +++++++++++-------- 1 file changed, 80 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/0c981d6f/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java index fa52501..a59f05e 100644 --- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java +++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java @@ -101,68 +101,91 @@ public class AiravataServerHandler implements Airavata.Iface, Watcher { public AiravataServerHandler() { try { - String zkhostPort = AiravataZKUtils.getZKhostPort(); - String airavataServerHostPort = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.API_SERVER_HOST) - + ":" + ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.API_SERVER_PORT); + storeServerConfig(); + } catch (ApplicationSettingsException e) { + e.printStackTrace(); + } + } - try { - zk = new ZooKeeper(zkhostPort, 6000, this); // no watcher is required, this will only use to store some data - String apiServer = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_API_SERVER_NODE,"/airavata-server"); - String OrchServer = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_ORCHESTRATOR_SERVER_NODE,"/orchestrator-server"); - String gfacServer = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_SERVER_NODE,"/gfac-server"); - String gfacExperiments = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE,"/gfac-experiments"); - - synchronized (mutex) { - mutex.wait(); // waiting for the syncConnected event - } - Stat zkStat = zk.exists(apiServer, false); - if (zkStat == null) { - zk.create(apiServer, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - } - String instantNode = apiServer + File.separator + String.valueOf(new Random().nextInt(Integer.MAX_VALUE)); - zkStat = zk.exists(instantNode, false); - if (zkStat == null) { - zk.create(instantNode, - airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.EPHEMERAL); // other component will watch these childeren creation deletion to monitor the status of the node - logger.info("Successfully created airavata-server node"); - } + private void storeServerConfig() throws ApplicationSettingsException { + String zkhostPort = AiravataZKUtils.getZKhostPort(); + String airavataServerHostPort = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.API_SERVER_HOST) + + ":" + ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.API_SERVER_PORT); - zkStat = zk.exists(OrchServer, false); - if (zkStat == null) { - zk.create(OrchServer, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - logger.info("Successfully created orchestrator-server node"); - } - zkStat = zk.exists(gfacServer, false); - if (zkStat == null) { - zk.create(gfacServer, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - logger.info("Successfully created gfac-server node"); - } - zkStat = zk.exists(gfacServer, false); - if (zkStat == null) { - zk.create(gfacExperiments, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - logger.info("Successfully created gfac-server node"); - } - logger.info("Finished starting ZK: " + zk); - } catch (IOException e) { - e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (KeeperException e) { - e.printStackTrace(); + try { + zk = new ZooKeeper(zkhostPort, 6000, this); // no watcher is required, this will only use to store some data + String apiServer = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_API_SERVER_NODE,"/airavata-server"); + String OrchServer = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_ORCHESTRATOR_SERVER_NODE,"/orchestrator-server"); + String gfacServer = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_SERVER_NODE,"/gfac-server"); + String gfacExperiments = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE,"/gfac-experiments"); + + synchronized (mutex) { + mutex.wait(); // waiting for the syncConnected event } - } catch (ApplicationSettingsException e) { + Stat zkStat = zk.exists(apiServer, false); + if (zkStat == null) { + zk.create(apiServer, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } + String instantNode = apiServer + File.separator + String.valueOf(new Random().nextInt(Integer.MAX_VALUE)); + zkStat = zk.exists(instantNode, false); + if (zkStat == null) { + zk.create(instantNode, + airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL); // other component will watch these childeren creation deletion to monitor the status of the node + logger.info("Successfully created airavata-server node"); + } + + zkStat = zk.exists(OrchServer, false); + if (zkStat == null) { + zk.create(OrchServer, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + logger.info("Successfully created orchestrator-server node"); + } + zkStat = zk.exists(gfacServer, false); + if (zkStat == null) { + zk.create(gfacServer, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + logger.info("Successfully created gfac-server node"); + } + zkStat = zk.exists(gfacServer, false); + if (zkStat == null) { + zk.create(gfacExperiments, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + logger.info("Successfully created gfac-server node"); + } + logger.info("Finished starting ZK: " + zk); + } catch (IOException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (KeeperException e) { e.printStackTrace(); } } - synchronized public void process(WatchedEvent watchedEvent) { synchronized (mutex) { - mutex.notify(); + Event.KeeperState state = watchedEvent.getState(); + logger.info(state.name()); + if (state == Event.KeeperState.SyncConnected) { + mutex.notify(); + } else if(state == Event.KeeperState.Expired || + state == Event.KeeperState.Disconnected){ + try { + mutex = -1; + zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, this); + synchronized (mutex) { + mutex.wait(); // waiting for the syncConnected event + } + storeServerConfig(); + } catch (IOException e) { + e.printStackTrace(); + } catch (ApplicationSettingsException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } } } @@ -912,6 +935,7 @@ public class AiravataServerHandler implements Airavata.Iface, Watcher { throw e; }finally { orchestratorClient.getOutputProtocol().getTransport().close(); + orchestratorClient.getInputProtocol().getTransport().close(); } @@ -1242,6 +1266,8 @@ public class AiravataServerHandler implements Airavata.Iface, Watcher { throw new TException(e); }finally { orchestratorClient.getOutputProtocol().getTransport().close(); + orchestratorClient.getInputProtocol().getTransport().close(); + } return true; }
