Repository: airavata Updated Branches: refs/heads/master 4f95a7908 -> a4daa528a
To fix Airavata-1661. Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/c5c83ea5 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/c5c83ea5 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/c5c83ea5 Branch: refs/heads/master Commit: c5c83ea5399b99e7eb72c23579c0cf2aa95c1be1 Parents: 71a94a0 Author: raminder <[email protected]> Authored: Mon Apr 13 17:07:56 2015 -0400 Committer: raminder <[email protected]> Committed: Mon Apr 13 17:07:56 2015 -0400 ---------------------------------------------------------------------- .../server/handler/AiravataServerHandler.java | 2 +- .../client/samples/CancelExperiments.java | 3 ++- .../server/OrchestratorServerHandler.java | 6 +++--- .../util/OrchestratorRecoveryHandler.java | 2 +- .../core/impl/GFACEmbeddedJobSubmitter.java | 2 +- .../core/impl/GFACPassiveJobSubmitter.java | 19 +++++++++++++++---- .../core/impl/GFACRPCJobSubmitter.java | 2 +- .../orchestrator/core/job/JobSubmitter.java | 2 +- .../cpi/impl/SimpleOrchestratorImpl.java | 2 +- 9 files changed, 26 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/c5c83ea5/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java index 66eca0d..00ad99f 100644 --- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java +++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java @@ -1403,7 +1403,7 @@ public class AiravataServerHandler implements Airavata.Iface { @Override public void terminateExperiment(String airavataExperimentId, String tokenId) throws InvalidRequestException, ExperimentNotFoundException, AiravataClientException, AiravataSystemException, TException { Client client = getOrchestratorClient(); - client.terminateExperiment(airavataExperimentId); + client.terminateExperiment(airavataExperimentId, tokenId); } /** http://git-wip-us.apache.org/repos/asf/airavata/blob/c5c83ea5/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CancelExperiments.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CancelExperiments.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CancelExperiments.java index 29eec9e..8d41295 100644 --- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CancelExperiments.java +++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CancelExperiments.java @@ -59,7 +59,8 @@ public class CancelExperiments { public static void terminateExperiment(Airavata.Client client, String expId) throws TException { try { - client.terminateExperiment(expId); + String tokenId = "-0bbb-403b-a88a-42b6dbe198e9"; + client.terminateExperiment(expId, tokenId); } catch (ExperimentNotFoundException e) { logger.error("Error occured while launching the experiment...", e.getMessage()); throw new ExperimentNotFoundException(e); http://git-wip-us.apache.org/repos/asf/airavata/blob/c5c83ea5/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index df64638..dafd39c 100644 --- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -330,7 +330,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, */ public boolean terminateExperiment(String experimentId, String tokenId) throws TException { log.infoId(experimentId, "Experiment: {} is cancelling !!!!!", experimentId); - return validateStatesAndCancel(experimentId); + return validateStatesAndCancel(experimentId, tokenId); } /** @@ -536,7 +536,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, return selectedModuleId; } - private boolean validateStatesAndCancel(String experimentId)throws TException{ + private boolean validateStatesAndCancel(String experimentId, String tokenId)throws TException{ try { Experiment experiment = (Experiment) registry.get( RegistryModelType.EXPERIMENT, experimentId); @@ -643,7 +643,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, // job submisssion+monitoring // launching the experiment orchestrator.cancelExperiment(experiment, - workflowNodeDetail, taskDetails, null); + workflowNodeDetail, taskDetails, tokenId); // after performing gfac level cancel operation // mark task cancelled http://git-wip-us.apache.org/repos/asf/airavata/blob/c5c83ea5/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java index 9005f70..c0a8890 100644 --- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java +++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java @@ -82,7 +82,7 @@ public class OrchestratorRecoveryHandler implements Watcher { log.info("------------------------------------------------------------------------------------"); try { if(GFacUtils.isCancelled(expId.split("\\+")[0], zk)) {// during relaunching we check the operation and then launch - serverHandler.terminateExperiment(expId.split("\\+")[0]); + serverHandler.terminateExperiment(expId.split("\\+")[0], null); }else { serverHandler.launchExperiment(expId.split("\\+")[0], null); } http://git-wip-us.apache.org/repos/asf/airavata/blob/c5c83ea5/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java index bafb45e..3008c63 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java @@ -81,7 +81,7 @@ public class GFACEmbeddedJobSubmitter implements JobSubmitter { } } - public boolean terminate(String experimentID, String taskID) throws OrchestratorException { + public boolean terminate(String experimentID, String taskID, String tokenId) throws OrchestratorException { return false; } http://git-wip-us.apache.org/repos/asf/airavata/blob/c5c83ea5/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java index c17638b..af60d85 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java @@ -150,7 +150,7 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher { * @return * @throws OrchestratorException */ - public boolean terminate(String experimentID, String taskID) throws OrchestratorException { + public boolean terminate(String experimentID, String taskID, String tokenId) throws OrchestratorException { ZooKeeper zk = orchestratorContext.getZk(); try { if (zk == null || !zk.getState().isConnected()) { @@ -163,7 +163,18 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher { String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"); String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); List<String> children = zk.getChildren(gfacServer, this); - + String gatewayId = null; + CredentialReader credentialReader = GFacUtils.getCredentialReader(); + if (credentialReader != null) { + try { + gatewayId = credentialReader.getGatewayID(tokenId); + } catch (Exception e) { + logger.error(e.getLocalizedMessage()); + } + } + if (gatewayId == null || gatewayId.isEmpty()) { + gatewayId = ServerSettings.getDefaultUserGateway(); + } if (children.size() == 0) { // Zookeeper data need cleaning throw new OrchestratorException("There is no active GFac instance to route the request"); @@ -175,8 +186,8 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher { String[] split = gfacNodeData.split(":"); if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) { // before submitting the job we check again the state of the node - TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID, null, null); - MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.TERMINATETASK, "LAUNCH.TERMINATE-" + UUID.randomUUID().toString(), null); + TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID, gatewayId, tokenId); + MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.TERMINATETASK, "LAUNCH.TERMINATE-" + UUID.randomUUID().toString(), gatewayId); publisher.publish(messageContext); } } http://git-wip-us.apache.org/repos/asf/airavata/blob/c5c83ea5/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java index 5a1be5a..64ced47 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java @@ -140,7 +140,7 @@ public class GFACRPCJobSubmitter implements JobSubmitter, Watcher { return false; } - public boolean terminate(String experimentID, String taskID) throws OrchestratorException { + public boolean terminate(String experimentID, String taskID, String tokenId) throws OrchestratorException { ZooKeeper zk = orchestratorContext.getZk(); GfacService.Client localhost = null; try { http://git-wip-us.apache.org/repos/asf/airavata/blob/c5c83ea5/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java index be0c76c..b885da5 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java @@ -60,5 +60,5 @@ public interface JobSubmitter { * @return * @throws OrchestratorException */ - boolean terminate(String experimentID,String taskID)throws OrchestratorException; + boolean terminate(String experimentID,String taskID, String tokenId)throws OrchestratorException; } http://git-wip-us.apache.org/repos/asf/airavata/blob/c5c83ea5/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java index 3c7c294..0a768bd 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java @@ -169,7 +169,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ return; } } - jobSubmitter.terminate(experiment.getExperimentID(),task.getTaskID()); + jobSubmitter.terminate(experiment.getExperimentID(),task.getTaskID(),tokenId); }
