This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch staging
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/staging by this push:
new 73942aa Removing unwanted zookeeper path creation inside orchestrator
73942aa is described below
commit 73942aaf1397db80a24804a727b434ab7c731d95
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Thu Oct 18 11:42:54 2018 -0400
Removing unwanted zookeeper path creation inside orchestrator
---
.../server/OrchestratorServerHandler.java | 45 ++++------------------
1 file changed, 7 insertions(+), 38 deletions(-)
diff --git
a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index a49c168..f79a4ba 100644
---
a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++
b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -75,13 +75,10 @@ import java.util.*;
public class OrchestratorServerHandler implements OrchestratorService.Iface {
private static Logger log =
LoggerFactory.getLogger(OrchestratorServerHandler.class);
private SimpleOrchestratorImpl orchestrator = null;
- private static Integer mutex = new Integer(-1);
private String airavataUserName;
private String gatewayName;
private Publisher publisher;
- private final Subscriber statusSubscribe;
private final Subscriber experimentSubscriber;
- private CuratorFramework curatorClient;
/**
* Query orchestrator server to fetch the CPI version
@@ -101,23 +98,13 @@ public class OrchestratorServerHandler implements
OrchestratorService.Iface {
publisher = MessagingFactory.getPublisher(Type.STATUS);
orchestrator.initialize();
orchestrator.getOrchestratorContext().setPublisher(this.publisher);
- statusSubscribe = getStatusSubscriber();
experimentSubscriber = getExperimentSubscriber();
- startCurator();
} catch (OrchestratorException | AiravataException e) {
log.error(e.getMessage(), e);
throw new OrchestratorException("Error while
initializing orchestrator service", e);
}
}
- private Subscriber getStatusSubscriber() throws AiravataException {
- List<String> routingKeys = new ArrayList<>();
-// routingKeys.add("*"); // listen for gateway level
messages
-// routingKeys.add("*.*"); // listen for
gateway/experiment level messages
- routingKeys.add("*.*.*"); // listen for
gateway/experiment/process level messages
- return MessagingFactory.getSubscriber(new
ProcessStatusHandler(),routingKeys, Type.STATUS);
- }
-
private Subscriber getExperimentSubscriber() throws AiravataException {
List<String> routingKeys = new ArrayList<>();
routingKeys.add(ServerSettings.getRabbitmqExperimentLaunchQueueName());
@@ -136,10 +123,6 @@ public class OrchestratorServerHandler implements
OrchestratorService.Iface {
ExperimentModel experiment = null;
final RegistryService.Client registryClient =
getRegistryServiceClient();
try {
- String experimentNodePath = GFacUtils.getExperimentNodePath
(experimentId);
-
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(),
experimentNodePath);
- String experimentCancelNode =
ZKPaths.makePath(experimentNodePath,
ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
-
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(),
experimentCancelNode);
experiment = registryClient.getExperiment(experimentId);
if (experiment == null) {
log.error("Error retrieving the Experiment by the given
experimentID: {} ", experimentId);
@@ -449,21 +432,13 @@ public class OrchestratorServerHandler implements
OrchestratorService.Iface {
}
orchestrator.cancelExperiment(experimentModel,
token);
- // TODO deprecate this approach as we are
replacing gfac
- String expCancelNodePath =
ZKPaths.makePath(ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE,
- experimentId),
ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
- Stat stat =
curatorClient.checkExists().forPath(expCancelNodePath);
- if (stat != null) {
-
curatorClient.setData().withVersion(-1).forPath(expCancelNodePath,
ZkConstants.ZOOKEEPER_CANCEL_REQEUST
- .getBytes());
- ExperimentStatus status = new
ExperimentStatus(ExperimentState.CANCELING);
- status.setReason("Experiment cancel
request processed");
-
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-
OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, status,
publisher, gatewayId);
- log.info("expId : " + experimentId + "
:- Experiment status updated to " + status.getState());
- return true;
- }
- return false;
+
+ ExperimentStatus status = new
ExperimentStatus(ExperimentState.CANCELING);
+ status.setReason("Experiment cancel request
processed");
+
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+
OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, status,
publisher, gatewayId);
+ log.info("expId : " + experimentId + " :-
Experiment status updated to " + status.getState());
+ return true;
}
}
@@ -477,12 +452,6 @@ public class OrchestratorServerHandler implements
OrchestratorService.Iface {
// }
}
- private void startCurator() throws ApplicationSettingsException {
- String connectionSting =
ServerSettings.getZookeeperConnection();
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
- curatorClient =
CuratorFrameworkFactory.newClient(connectionSting, retryPolicy);
- curatorClient.start();
- }
private class SingleAppExperimentRunner implements Runnable {
String experimentId;