Repository: airavata Updated Branches: refs/heads/master 420a51ad6 -> 2bf83dd03
Don't need to check Gfac node in zookeeper before publish Terminate request to work queue. Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/2bf83dd0 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/2bf83dd0 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/2bf83dd0 Branch: refs/heads/master Commit: 2bf83dd03cb1f312a8c1de114aedcd0801337901 Parents: 420a51a Author: shamrath <[email protected]> Authored: Wed May 13 16:24:00 2015 -0400 Committer: shamrath <[email protected]> Committed: Wed May 13 16:24:00 2015 -0400 ---------------------------------------------------------------------- .../core/impl/GFACPassiveJobSubmitter.java | 55 +++----------------- 1 file changed, 8 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/2bf83dd0/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java index cd5b45b..1faef9e 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java @@ -135,20 +135,8 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher { * @throws OrchestratorException */ public boolean terminate(String experimentID, String taskID, String tokenId) throws OrchestratorException { - ZooKeeper zk = orchestratorContext.getZk(); + String gatewayId = null; try { - if (zk == null || !zk.getState().isConnected()) { - String zkhostPort = AiravataZKUtils.getZKhostPort(); - zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this); - logger.info("Waiting for zookeeper to connect to the server"); - synchronized (mutex) { - mutex.wait(5000); - } - } - String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"); - String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); - List<String> children = zk.getChildren(gfacServer, this); - String gatewayId = null; CredentialReader credentialReader = GFacUtils.getCredentialReader(); if (credentialReader != null) { try { @@ -158,45 +146,18 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher { } } if (gatewayId == null || gatewayId.isEmpty()) { + gatewayId = ServerSettings.getDefaultUserGateway(); + } - if (children.size() == 0) { - // Zookeeper data need cleaning - throw new OrchestratorException("There is no active GFac instance to route the request"); - } else { - String pickedChild = children.get(new Random().nextInt(Integer.MAX_VALUE) % children.size()); - // here we are not using an index because the getChildren does not return the same order everytime - String gfacNodeData = new String(zk.getData(gfacServer + File.separator + pickedChild, false, null)); - logger.info("GFAC instance node data: " + gfacNodeData); - String[] split = gfacNodeData.split(":"); - if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) { - // before submitting the job we check again the state of the node - TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent(experimentID, taskID, gatewayId, tokenId); - MessageContext messageContext = new MessageContext(taskTerminateEvent, MessageType.TERMINATETASK, "LAUNCH.TERMINATE-" + UUID.randomUUID().toString(), gatewayId); - messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); - publisher.publish(messageContext); - } - } - } catch (InterruptedException e) { - logger.error(e.getMessage(), e); - throw new OrchestratorException(e); - } catch (KeeperException e) { - logger.error(e.getMessage(), e); - throw new OrchestratorException(e); - } catch (ApplicationSettingsException e) { - logger.error(e.getMessage(), e); - throw new OrchestratorException(e); - } catch (IOException e) { - logger.error(e.getMessage(), e); - throw new OrchestratorException(e); + TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent(experimentID, taskID, gatewayId, tokenId); + MessageContext messageContext = new MessageContext(taskTerminateEvent, MessageType.TERMINATETASK, "LAUNCH.TERMINATE-" + UUID.randomUUID().toString(), gatewayId); + messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); + publisher.publish(messageContext); + return true; } catch (Exception e) { - logger.error(e.getMessage(), e); throw new OrchestratorException(e); - }finally { - closeZK(orchestratorContext); } - return false; - } private void closeZK(OrchestratorContext orchestratorContext) {
