Repository: airavata Updated Branches: refs/heads/master ee1df1b4a -> b8bb82ce8
Improvement to zk usage in orchestrator Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b8bb82ce Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b8bb82ce Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b8bb82ce Branch: refs/heads/master Commit: b8bb82ce835062dd741d8ff76041f25049499977 Parents: ee1df1b Author: lahiru <[email protected]> Authored: Wed Sep 17 11:28:34 2014 -0400 Committer: lahiru <[email protected]> Committed: Wed Sep 17 11:28:34 2014 -0400 ---------------------------------------------------------------------- .../client/samples/CreateLaunchExperiment.java | 14 ++-- .../impl/push/amqp/SimpleJobFinishConsumer.java | 4 +- .../server/OrchestratorServerHandler.java | 72 ++++++++++++-------- .../core/context/OrchestratorContext.java | 6 +- 4 files changed, 54 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/b8bb82ce/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java index 827a187..490fbf9 100644 --- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java +++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java @@ -46,9 +46,9 @@ public class CreateLaunchExperiment { private static final String DEFAULT_USER = "default.registry.user"; private static final String DEFAULT_GATEWAY = "default.registry.gateway"; private static Airavata.Client airavataClient; - private static String echoAppId = "Echo_bb2fe905-718b-4f4a-91ad-e1551feb06c3"; - private static String wrfAppId = "WRF_35684c10-7749-4691-939b-225903fd2f73"; - private static String amberAppId = "Amber_f4384478-a707-45f9-b2ed-477fe7fb486b"; + private static String echoAppId = "Echo_b7cebf37-df12-4803-a50c-efdbc2edd9b6"; + private static String wrfAppId = "WRF_5f097c9c-7066-49ec-aed7-4e39607b3adc"; + private static String amberAppId = "Amber_89906be6-5678-49a6-9d04-a0604fbdef2e"; private static String localHost = "localhost"; private static String trestlesHostName = "trestles.sdsc.xsede.org"; @@ -59,14 +59,14 @@ public class CreateLaunchExperiment { try { airavataClient = AiravataClientFactory.createAiravataClient(THRIFT_SERVER_HOST, THRIFT_SERVER_PORT); System.out.println("API version is " + airavataClient.getAPIVersion()); -// registerApplications(); + registerApplications(); //// final String expId = createExperimentForSSHHost(airavata); // final String expId = createEchoExperimentForTrestles(airavataClient); // final String expId = createEchoExperimentForStampede(airavataClient); - final String expId = createExperimentEchoForLocalHost(airavataClient); +// final String expId = createExperimentEchoForLocalHost(airavataClient); // final String expId = createExperimentWRFTrestles(airavataClient); // final String expId = createExperimentForBR2(airavataClient); // final String expId = createExperimentForBR2Amber(airavataClient); @@ -74,9 +74,9 @@ public class CreateLaunchExperiment { // final String expId = createExperimentForStampedeAmber(airavataClient); // final String expId = createExperimentForTrestlesAmber(airavataClient); - System.out.println("Experiment ID : " + expId); +// System.out.println("Experiment ID : " + expId); // updateExperiment(airavata, expId); - launchExperiment(airavataClient, expId); +// launchExperiment(airavataClient, expId); // System.out.println("retrieved exp id : " + experiment.getExperimentID()); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/airavata/blob/b8bb82ce/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java index 3d62fc0..407d208 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java @@ -63,13 +63,13 @@ public class SimpleJobFinishConsumer { ch.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } catch (Exception ex) { - logger.error("Cannot connect to a RabbitMQ Server: " + ex); + logger.error("Cannot connect to a RabbitMQ Server: " , ex); } } }).start(); } catch (Exception ex) { - logger.error("Cannot connect to a RabbitMQ Server: " + ex); + logger.error("Cannot connect to a RabbitMQ Server: " , ex); logger.info("------------- Push monitoring for HPC jobs is disabled -------------"); } } http://git-wip-us.apache.org/repos/asf/airavata/blob/b8bb82ce/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index 1f5160f..c1a00a8 100644 --- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -103,34 +103,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, synchronized (mutex) { mutex.wait(); // waiting for the syncConnected event } - Stat zkStat = zk.exists(OrchServer, false); - if (zkStat == null) { - zk.create(OrchServer, new byte[0], - ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } - String instantNode = OrchServer - + File.separator - + String.valueOf(new Random() - .nextInt(Integer.MAX_VALUE)); - zkStat = zk.exists(instantNode, false); - if (zkStat == null) { - zk.create(instantNode, airavataServerHostPort.getBytes(), - ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); // other - // component - // will - // watch - // these - // childeren - // creation - // deletion - // to - // monitor - // the - // status - // of - // the - // node - } + registerOrchestratorService(airavataServerHostPort, OrchServer); // creating a watch in orchestrator to monitor the gfac // instances zk.getChildren(ServerSettings.getSetting( @@ -162,7 +135,24 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, } } - /** + private void registerOrchestratorService(String airavataServerHostPort, String orchServer) throws KeeperException, InterruptedException { + Stat zkStat = zk.exists(orchServer, false); + if (zkStat == null) { + zk.create(orchServer, new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + String instantNode = orchServer + + File.separator + + String.valueOf(new Random() + .nextInt(Integer.MAX_VALUE)); + zkStat = zk.exists(instantNode, false); + if (zkStat == null) { + zk.create(instantNode, airavataServerHostPort.getBytes(), + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + } + } + + /** * * After creating the experiment Data user have the * experimentID as the * handler to the experiment, during the launchExperiment * We just have to * give the experimentID * * @param experimentID * @return sucess/failure * @@ -307,7 +297,27 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, case SyncConnected: mutex.notify(); break; - } + case Expired:case Disconnected: + try { + zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, this); + String airavataServerHostPort = ServerSettings + .getSetting(Constants.ORCHESTRATOR_SERVER_HOST) + + ":" + + ServerSettings + .getSetting(Constants.ORCHESTRATOR_SERVER_PORT); + String OrchServer = ServerSettings + .getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_ORCHESTRATOR_SERVER_NODE); + registerOrchestratorService(airavataServerHostPort, OrchServer); + } catch (IOException e) { + e.printStackTrace(); + } catch (ApplicationSettingsException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (KeeperException e) { + e.printStackTrace(); + } + } if (watchedEvent.getPath() != null && watchedEvent.getPath().startsWith( ServerSettings.getSetting( @@ -358,6 +368,8 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, }).start(); break; } + + } } catch (KeeperException e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/airavata/blob/b8bb82ce/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java index 7cd212a..542017c 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java @@ -39,7 +39,7 @@ public class OrchestratorContext { private Registry newRegistry; - private ZooKeeper zk; + private static ZooKeeper zk; // this instance can be accessed by the Validators and other components public OrchestratorContext(List<GFACInstance> gfacInstanceList) { if (gfacInstanceList != null) { @@ -81,11 +81,11 @@ public class OrchestratorContext { this.gfacInstanceList.addAll(gfacInstanceList); } - public void setZk(ZooKeeper zk) { + public void setZk(ZooKeeper zk) { this.zk = zk; } - public ZooKeeper getZk() { + public static ZooKeeper getZk() { return zk; } }
