Repository: airavata Updated Branches: refs/heads/master c05d5e6c8 -> aa27ce109
http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java new file mode 100644 index 0000000..8029a0c --- /dev/null +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.messaging.core.impl; + +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.common.utils.ThriftUtils; +import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.MessagingConstants; +import org.apache.airavata.messaging.core.Publisher; +import org.apache.airavata.model.messaging.event.*; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RabbitMQTaskLaunchPublisher implements Publisher{ + private final static Logger log = LoggerFactory.getLogger(RabbitMQTaskLaunchPublisher.class); + public static final String LAUNCH_TASK = "launch.task"; + public static final String TERMINATE_TASK = "teminate.task"; + + private RabbitMQProducer rabbitMQProducer; + + public RabbitMQTaskLaunchPublisher() throws Exception { + String brokerUrl; + String exchangeName; + try { + brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL); + exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_EXCHANGE_NAME); + } catch (ApplicationSettingsException e) { + String message = "Failed to get read the required properties from airavata to initialize rabbitmq"; + log.error(message, e); + throw new AiravataException(message, e); + } + rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName); + rabbitMQProducer.open(); + } + + public void publish(MessageContext msgCtx) throws AiravataException { + try { + log.info("Publishing to lauch queue ..."); + byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent()); + Message message = new Message(); + message.setEvent(body); + message.setMessageId(msgCtx.getMessageId()); + message.setMessageType(msgCtx.getType()); + message.setUpdatedTime(msgCtx.getUpdatedTime().getTime()); + String routingKey = null; + if (msgCtx.getType().equals(MessageType.LAUNCHTASK)){ + TaskSubmitEvent event = (TaskSubmitEvent) msgCtx.getEvent(); + routingKey = LAUNCH_TASK + "."+event.getExperimentId() + "." + + event.getTaskId() + "." + event.getGatewayId(); + }else if(msgCtx.getType().equals(MessageType.TERMINATETASK)){ + TaskTerminateEvent event = (TaskTerminateEvent) msgCtx.getEvent(); + routingKey = TERMINATE_TASK + "."+event.getExperimentId() + "." + + event.getTaskId(); + } + byte[] messageBody = ThriftUtils.serializeThriftObject(message); + rabbitMQProducer.send(messageBody, routingKey); + } catch (TException e) { + String msg = "Error while deserializing the object"; + log.error(msg, e); + throw new AiravataException(msg, e); + } catch (Exception e) { + String msg = "Error while sending to rabbitmq"; + log.error(msg, e); + throw new AiravataException(msg, e); + } + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index e88945d..b200468 100644 --- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -103,7 +103,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, // registering with zk try { if (ServerSettings.isRabbitMqPublishEnabled()) { - publisher = PublisherFactory.createPublisher(); + publisher = PublisherFactory.createActivityPublisher(); } String zkhostPort = AiravataZKUtils.getZKhostPort(); String airavataServerHostPort = ServerSettings @@ -156,6 +156,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, registry = RegistryFactory.getDefaultRegistry(); orchestrator.initialize(); orchestrator.getOrchestratorContext().setZk(this.zk); + orchestrator.getOrchestratorContext().setPublisher(this.publisher); } catch (OrchestratorException e) { log.error(e.getMessage(), e); throw new OrchestratorException("Error while initializing orchestrator service", e); http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/modules/orchestrator/orchestrator-core/pom.xml ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/pom.xml b/modules/orchestrator/orchestrator-core/pom.xml index 0dffaee..23863fb 100644 --- a/modules/orchestrator/orchestrator-core/pom.xml +++ b/modules/orchestrator/orchestrator-core/pom.xml @@ -121,11 +121,21 @@ the License. --> <artifactId>airavata-server-configuration</artifactId> <scope>test</scope> </dependency> - <!-- zookeeper dependencies --> + <!-- zookeeper and curator dependencies --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> - <version>3.4.0</version> + <version>${zk.version}</version> + </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-recipes</artifactId> + <version>${curator.version}</version> + </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-framework</artifactId> + <version>${curator.version}</version> </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java index 5d465ff..b77087b 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.airavata.gfac.client.GFACInstance; +import org.apache.airavata.messaging.core.Publisher; import org.apache.airavata.orchestrator.core.OrchestratorConfiguration; import org.apache.airavata.registry.cpi.Registry; import org.apache.zookeeper.ZooKeeper; @@ -39,6 +40,8 @@ public class OrchestratorContext { private Registry newRegistry; private static ZooKeeper zk; // this instance can be accessed by the Validators and other components + + private Publisher publisher; public OrchestratorContext(List<GFACInstance> gfacInstanceList) { if (gfacInstanceList != null) { @@ -48,6 +51,14 @@ public class OrchestratorContext { } } + public Publisher getPublisher() { + return publisher; + } + + public void setPublisher(Publisher publisher) { + this.publisher = publisher; + } + public OrchestratorContext() { this(null); } http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java new file mode 100644 index 0000000..58ac982 --- /dev/null +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java @@ -0,0 +1,247 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.orchestrator.core.impl; + +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.AiravataZKUtils; +import org.apache.airavata.common.utils.Constants; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.credential.store.store.CredentialReader; +import org.apache.airavata.gfac.client.GFACInstance; +import org.apache.airavata.gfac.client.GFacClientFactory; +import org.apache.airavata.gfac.core.utils.GFacUtils; +import org.apache.airavata.gfac.cpi.GfacService; +import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.Publisher; +import org.apache.airavata.messaging.core.PublisherFactory; +import org.apache.airavata.model.messaging.event.MessageType; +import org.apache.airavata.model.messaging.event.TaskSubmitEvent; +import org.apache.airavata.orchestrator.core.context.OrchestratorContext; +import org.apache.airavata.orchestrator.core.exception.OrchestratorException; +import org.apache.airavata.orchestrator.core.job.JobSubmitter; +import org.apache.thrift.TException; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Random; +import java.util.UUID; + +/** + * This class can be used to do the communication between orchestrator and gfac to handle using a queue + */ +public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher { + private final static Logger logger = LoggerFactory.getLogger(GFACPassiveJobSubmitter.class); + + public static final String IP = "ip"; + + private OrchestratorContext orchestratorContext; + + private static Integer mutex = -1; + + private Publisher publisher; + + public void initialize(OrchestratorContext orchestratorContext) throws OrchestratorException { + this.orchestratorContext = orchestratorContext; + if(orchestratorContext.getPublisher()!=null){ // use the same publisher this will be empty if rabbitmq.publish is not enabled in the configuraiton + this.publisher = orchestratorContext.getPublisher(); + }else { + try { + this.publisher = PublisherFactory.createTaskLaunchPublisher(); + } catch (AiravataException e) { + logger.error(e.getMessage(), e); + throw new OrchestratorException("Cannot initialize " + GFACPassiveJobSubmitter.class + " need to start Rabbitmq server to use " + GFACPassiveJobSubmitter.class); + } + } + } + + public GFACInstance selectGFACInstance() throws OrchestratorException { + // currently we only support one instance but future we have to pick an + // instance + return null; + } + + public boolean submit(String experimentID, String taskID) throws OrchestratorException { + return submit(experimentID, taskID, null); + } + + /** + * Submit the job to a shared launch.queue accross multiple gfac instances + * + * @param experimentID + * @param taskID + * @param tokenId + * @return + * @throws OrchestratorException + */ + public boolean submit(String experimentID, String taskID, String tokenId) throws OrchestratorException { + + ZooKeeper zk = orchestratorContext.getZk(); + GfacService.Client gfacClient = null; + try { + if (zk == null || !zk.getState().isConnected()) { + String zkhostPort = AiravataZKUtils.getZKhostPort(); + zk = new ZooKeeper(zkhostPort, 6000, this); + synchronized (mutex) { + mutex.wait(); + } + } + String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"); + String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); + List<String> children = zk.getChildren(gfacServer, this); + + if (children.size() == 0) { + // Zookeeper data need cleaning + throw new OrchestratorException("There is no active GFac instance to route the request"); + } else { + String pickedChild = children.get(new Random().nextInt(Integer.MAX_VALUE) % children.size()); + // here we are not using an index because the getChildren does not return the same order everytime + String gfacNodeData = new String(zk.getData(gfacServer + File.separator + pickedChild, false, null)); + logger.info("GFAC instance node data: " + gfacNodeData); + String[] split = gfacNodeData.split(":"); + gfacClient = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1])); + if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) { + // before submitting the job we check again the state of the node + if (GFacUtils.createExperimentEntry(experimentID, taskID, zk, experimentNode, pickedChild, tokenId)) { + String gatewayId = null; + CredentialReader credentialReader = GFacUtils.getCredentialReader(); + if (credentialReader != null) { + try { + gatewayId = credentialReader.getGatewayID(tokenId); + } catch (Exception e) { + logger.error(e.getLocalizedMessage()); + } + } + if(gatewayId == null || gatewayId.isEmpty()){ + gatewayId = ServerSettings.getDefaultUserGateway(); + } + TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID, gatewayId); + MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.LAUNCHTASK,"LAUNCH.TASK-"+ UUID.randomUUID().toString(),gatewayId); + publisher.publish(messageContext); + } + } + } + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + throw new OrchestratorException(e); + } catch (KeeperException e) { + logger.error(e.getMessage(), e); + throw new OrchestratorException(e); + } catch (ApplicationSettingsException e) { + logger.error(e.getMessage(), e); + throw new OrchestratorException(e); + } catch (IOException e) { + logger.error(e.getMessage(), e); + throw new OrchestratorException(e); + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new OrchestratorException(e); + }finally { + gfacClient.getOutputProtocol().getTransport().close(); + } + return true; + + } + + /** + * Submit the experiment the terminate.queue job queue and remove the experiment from shared launch.queue + * @param experimentID + * @param taskID + * @return + * @throws OrchestratorException + */ + public boolean terminate(String experimentID, String taskID) throws OrchestratorException { + ZooKeeper zk = orchestratorContext.getZk(); + GfacService.Client localhost = null; + try { + if (zk == null || !zk.getState().isConnected()) { + String zkhostPort = AiravataZKUtils.getZKhostPort(); + zk = new ZooKeeper(zkhostPort, 6000, this); + synchronized (mutex) { + mutex.wait(); + } + } + String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"); + String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); + List<String> children = zk.getChildren(gfacServer, this); + + if (children.size() == 0) { + // Zookeeper data need cleaning + throw new OrchestratorException("There is no active GFac instance to route the request"); + } else { + String pickedChild = children.get(new Random().nextInt(Integer.MAX_VALUE) % children.size()); + // here we are not using an index because the getChildren does not return the same order everytime + String gfacNodeData = new String(zk.getData(gfacServer + File.separator + pickedChild, false, null)); + logger.info("GFAC instance node data: " + gfacNodeData); + String[] split = gfacNodeData.split(":"); + localhost = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1])); + if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) { + // before submitting the job we check again the state of the node + if (GFacUtils.createExperimentEntry(experimentID, taskID, zk, experimentNode, pickedChild, null)) { + TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID, null); + MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.LAUNCHTASK,"LAUNCH.TERMINATE-"+ UUID.randomUUID().toString(),null); + publisher.publish(messageContext); + } + } + } + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + throw new OrchestratorException(e); + } catch (KeeperException e) { + logger.error(e.getMessage(), e); + throw new OrchestratorException(e); + } catch (ApplicationSettingsException e) { + logger.error(e.getMessage(), e); + throw new OrchestratorException(e); + } catch (IOException e) { + logger.error(e.getMessage(), e); + throw new OrchestratorException(e); + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new OrchestratorException(e); + }finally { + + } + return false; + + } + + synchronized public void process(WatchedEvent event) { + synchronized (mutex) { + switch (event.getState()) { + case SyncConnected: + mutex.notify(); + } + switch (event.getType()) { + case NodeCreated: + mutex.notify(); + break; + } + } + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java new file mode 100644 index 0000000..54339a2 --- /dev/null +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java @@ -0,0 +1,212 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.airavata.orchestrator.core.impl; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Random; + +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.AiravataZKUtils; +import org.apache.airavata.common.utils.Constants; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.credential.store.store.CredentialReader; +import org.apache.airavata.gfac.client.GFACInstance; +import org.apache.airavata.gfac.client.GFacClientFactory; +import org.apache.airavata.gfac.core.utils.GFacUtils; +import org.apache.airavata.gfac.cpi.GfacService; +import org.apache.airavata.orchestrator.core.context.OrchestratorContext; +import org.apache.airavata.orchestrator.core.exception.OrchestratorException; +import org.apache.airavata.orchestrator.core.job.JobSubmitter; +import org.apache.thrift.TException; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* + * this class is responsible for submitting a job to gfac in service mode, + * it will select a gfac instance based on the incoming request and submit to that + * gfac instance. + */ +public class GFACRPCJobSubmitter implements JobSubmitter, Watcher { + private final static Logger logger = LoggerFactory.getLogger(GFACRPCJobSubmitter.class); + public static final String IP = "ip"; + + private OrchestratorContext orchestratorContext; + + private static Integer mutex = -1; + + public void initialize(OrchestratorContext orchestratorContext) throws OrchestratorException { + this.orchestratorContext = orchestratorContext; + } + + public GFACInstance selectGFACInstance() throws OrchestratorException { + // currently we only support one instance but future we have to pick an + // instance + return null; + } + + public boolean submit(String experimentID, String taskID) throws OrchestratorException { + return this.submit(experimentID, taskID, null); + } + + public boolean submit(String experimentID, String taskID, String tokenId) throws OrchestratorException { + ZooKeeper zk = orchestratorContext.getZk(); + GfacService.Client gfacClient = null; + try { + if (zk == null || !zk.getState().isConnected()) { + String zkhostPort = AiravataZKUtils.getZKhostPort(); + zk = new ZooKeeper(zkhostPort, 6000, this); + synchronized (mutex) { + mutex.wait(); + } + } + String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"); + String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); + List<String> children = zk.getChildren(gfacServer, this); + + if (children.size() == 0) { + // Zookeeper data need cleaning + throw new OrchestratorException("There is no active GFac instance to route the request"); + } else { + String pickedChild = children.get(new Random().nextInt(Integer.MAX_VALUE) % children.size()); + // here we are not using an index because the getChildren does not return the same order everytime + String gfacNodeData = new String(zk.getData(gfacServer + File.separator + pickedChild, false, null)); + logger.info("GFAC instance node data: " + gfacNodeData); + String[] split = gfacNodeData.split(":"); + gfacClient = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1])); + if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) { + // before submitting the job we check again the state of the node + if (GFacUtils.createExperimentEntry(experimentID, taskID, zk, experimentNode, pickedChild, tokenId)) { + String gatewayId = null; + CredentialReader credentialReader = GFacUtils.getCredentialReader(); + if (credentialReader != null) { + try { + gatewayId = credentialReader.getGatewayID(tokenId); + } catch (Exception e) { + logger.error(e.getLocalizedMessage()); + } + } + if(gatewayId == null || gatewayId.isEmpty()){ + gatewayId = ServerSettings.getDefaultUserGateway(); + } + return gfacClient.submitJob(experimentID, taskID, gatewayId); + } + } + } + } catch (TException e) { + logger.error(e.getMessage(), e); + throw new OrchestratorException(e); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + throw new OrchestratorException(e); + } catch (KeeperException e) { + logger.error(e.getMessage(), e); + throw new OrchestratorException(e); + } catch (ApplicationSettingsException e) { + logger.error(e.getMessage(), e); + throw new OrchestratorException(e); + } catch (IOException e) { + logger.error(e.getMessage(), e); + throw new OrchestratorException(e); + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new OrchestratorException(e); + }finally { + gfacClient.getOutputProtocol().getTransport().close(); + } + return false; + } + + public boolean terminate(String experimentID, String taskID) throws OrchestratorException { + ZooKeeper zk = orchestratorContext.getZk(); + GfacService.Client localhost = null; + try { + if (zk == null || !zk.getState().isConnected()) { + String zkhostPort = AiravataZKUtils.getZKhostPort(); + zk = new ZooKeeper(zkhostPort, 6000, this); + synchronized (mutex) { + mutex.wait(); + } + } + String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"); + String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); + List<String> children = zk.getChildren(gfacServer, this); + + if (children.size() == 0) { + // Zookeeper data need cleaning + throw new OrchestratorException("There is no active GFac instance to route the request"); + } else { + String pickedChild = children.get(new Random().nextInt(Integer.MAX_VALUE) % children.size()); + // here we are not using an index because the getChildren does not return the same order everytime + String gfacNodeData = new String(zk.getData(gfacServer + File.separator + pickedChild, false, null)); + logger.info("GFAC instance node data: " + gfacNodeData); + String[] split = gfacNodeData.split(":"); + localhost = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1])); + if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) { + // before submitting the job we check again the state of the node + if (GFacUtils.createExperimentEntry(experimentID, taskID, zk, experimentNode, pickedChild, null)) { + return localhost.cancelJob(experimentID, taskID); + } + } + } + } catch (TException e) { + logger.error(e.getMessage(), e); + throw new OrchestratorException(e); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + throw new OrchestratorException(e); + } catch (KeeperException e) { + logger.error(e.getMessage(), e); + throw new OrchestratorException(e); + } catch (ApplicationSettingsException e) { + logger.error(e.getMessage(), e); + throw new OrchestratorException(e); + } catch (IOException e) { + logger.error(e.getMessage(), e); + throw new OrchestratorException(e); + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new OrchestratorException(e); + }finally { + + } + return false; + } + + synchronized public void process(WatchedEvent event) { + synchronized (mutex) { + switch (event.getState()) { + case SyncConnected: + mutex.notify(); + } + switch (event.getType()) { + case NodeCreated: + mutex.notify(); + break; + } + } + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java deleted file mode 100644 index 3bbc588..0000000 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.airavata.orchestrator.core.impl; - -import java.io.File; -import java.io.IOException; -import java.util.List; -import java.util.Random; - -import org.apache.airavata.common.exception.ApplicationSettingsException; -import org.apache.airavata.common.utils.AiravataZKUtils; -import org.apache.airavata.common.utils.Constants; -import org.apache.airavata.common.utils.ServerSettings; -import org.apache.airavata.credential.store.store.CredentialReader; -import org.apache.airavata.gfac.client.GFACInstance; -import org.apache.airavata.gfac.client.GFacClientFactory; -import org.apache.airavata.gfac.core.utils.GFacUtils; -import org.apache.airavata.gfac.cpi.GfacService; -import org.apache.airavata.orchestrator.core.context.OrchestratorContext; -import org.apache.airavata.orchestrator.core.exception.OrchestratorException; -import org.apache.airavata.orchestrator.core.job.JobSubmitter; -import org.apache.thrift.TException; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/* - * this class is responsible for submitting a job to gfac in service mode, - * it will select a gfac instance based on the incoming request and submit to that - * gfac instance. - */ -public class GFACServiceJobSubmitter implements JobSubmitter, Watcher { - private final static Logger logger = LoggerFactory.getLogger(GFACServiceJobSubmitter.class); - public static final String IP = "ip"; - - private OrchestratorContext orchestratorContext; - - private static Integer mutex = -1; - - public void initialize(OrchestratorContext orchestratorContext) throws OrchestratorException { - this.orchestratorContext = orchestratorContext; - } - - public GFACInstance selectGFACInstance() throws OrchestratorException { - // currently we only support one instance but future we have to pick an - // instance - return null; - } - - public boolean submit(String experimentID, String taskID) throws OrchestratorException { - return this.submit(experimentID, taskID, null); - } - - public boolean submit(String experimentID, String taskID, String tokenId) throws OrchestratorException { - ZooKeeper zk = orchestratorContext.getZk(); - GfacService.Client gfacClient = null; - try { - if (zk == null || !zk.getState().isConnected()) { - String zkhostPort = AiravataZKUtils.getZKhostPort(); - zk = new ZooKeeper(zkhostPort, 6000, this); - synchronized (mutex) { - mutex.wait(); - } - } - String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"); - String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); - List<String> children = zk.getChildren(gfacServer, this); - - if (children.size() == 0) { - // Zookeeper data need cleaning - throw new OrchestratorException("There is no active GFac instance to route the request"); - } else { - String pickedChild = children.get(new Random().nextInt(Integer.MAX_VALUE) % children.size()); - // here we are not using an index because the getChildren does not return the same order everytime - String gfacNodeData = new String(zk.getData(gfacServer + File.separator + pickedChild, false, null)); - logger.info("GFAC instance node data: " + gfacNodeData); - String[] split = gfacNodeData.split(":"); - gfacClient = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1])); - if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) { - // before submitting the job we check again the state of the node - if (GFacUtils.createExperimentEntry(experimentID, taskID, zk, experimentNode, pickedChild, tokenId)) { - String gatewayId = null; - CredentialReader credentialReader = GFacUtils.getCredentialReader(); - if (credentialReader != null) { - try { - gatewayId = credentialReader.getGatewayID(tokenId); - } catch (Exception e) { - logger.error(e.getLocalizedMessage()); - } - } - if(gatewayId == null || gatewayId.isEmpty()){ - gatewayId = ServerSettings.getDefaultUserGateway(); - } - return gfacClient.submitJob(experimentID, taskID, gatewayId); - } - } - } - } catch (TException e) { - logger.error(e.getMessage(), e); - throw new OrchestratorException(e); - } catch (InterruptedException e) { - logger.error(e.getMessage(), e); - throw new OrchestratorException(e); - } catch (KeeperException e) { - logger.error(e.getMessage(), e); - throw new OrchestratorException(e); - } catch (ApplicationSettingsException e) { - logger.error(e.getMessage(), e); - throw new OrchestratorException(e); - } catch (IOException e) { - logger.error(e.getMessage(), e); - throw new OrchestratorException(e); - } catch (Exception e) { - logger.error(e.getMessage(), e); - throw new OrchestratorException(e); - }finally { - gfacClient.getOutputProtocol().getTransport().close(); - } - return false; - } - - public boolean terminate(String experimentID, String taskID) throws OrchestratorException { - ZooKeeper zk = orchestratorContext.getZk(); - GfacService.Client localhost = null; - try { - if (zk == null || !zk.getState().isConnected()) { - String zkhostPort = AiravataZKUtils.getZKhostPort(); - zk = new ZooKeeper(zkhostPort, 6000, this); - synchronized (mutex) { - mutex.wait(); - } - } - String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"); - String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); - List<String> children = zk.getChildren(gfacServer, this); - - if (children.size() == 0) { - // Zookeeper data need cleaning - throw new OrchestratorException("There is no active GFac instance to route the request"); - } else { - String pickedChild = children.get(new Random().nextInt(Integer.MAX_VALUE) % children.size()); - // here we are not using an index because the getChildren does not return the same order everytime - String gfacNodeData = new String(zk.getData(gfacServer + File.separator + pickedChild, false, null)); - logger.info("GFAC instance node data: " + gfacNodeData); - String[] split = gfacNodeData.split(":"); - localhost = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1])); - if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) { - // before submitting the job we check again the state of the node - if (GFacUtils.createExperimentEntry(experimentID, taskID, zk, experimentNode, pickedChild, null)) { - return localhost.cancelJob(experimentID, taskID); - } - } - } - } catch (TException e) { - logger.error(e.getMessage(), e); - throw new OrchestratorException(e); - } catch (InterruptedException e) { - logger.error(e.getMessage(), e); - throw new OrchestratorException(e); - } catch (KeeperException e) { - logger.error(e.getMessage(), e); - throw new OrchestratorException(e); - } catch (ApplicationSettingsException e) { - logger.error(e.getMessage(), e); - throw new OrchestratorException(e); - } catch (IOException e) { - logger.error(e.getMessage(), e); - throw new OrchestratorException(e); - } catch (Exception e) { - logger.error(e.getMessage(), e); - throw new OrchestratorException(e); - }finally { - - } - return false; - } - - synchronized public void process(WatchedEvent event) { - synchronized (mutex) { - switch (event.getState()) { - case SyncConnected: - mutex.notify(); - } - switch (event.getType()) { - case NodeCreated: - mutex.notify(); - break; - } - } - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/WorkflowEngineImpl.java ---------------------------------------------------------------------- diff --git a/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/WorkflowEngineImpl.java b/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/WorkflowEngineImpl.java index a8ee98b..0af8881 100644 --- a/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/WorkflowEngineImpl.java +++ b/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/WorkflowEngineImpl.java @@ -25,7 +25,7 @@ import org.airavata.appcatalog.cpi.WorkflowCatalog; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.messaging.core.Publisher; -import org.apache.airavata.messaging.core.impl.RabbitMQPublisher; +import org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher; import org.apache.airavata.model.error.AiravataClientConnectException; import org.apache.airavata.model.workspace.experiment.Experiment; import org.apache.airavata.orchestrator.client.OrchestratorClientFactory; @@ -47,7 +47,7 @@ public class WorkflowEngineImpl implements WorkflowEngine { private Publisher rabbitMQPublisher; WorkflowEngineImpl() { try { - rabbitMQPublisher = new RabbitMQPublisher(); + rabbitMQPublisher = new RabbitMQStatusPublisher(); } catch (Exception e) { logger.error("Failed to instantiate RabbitMQPublisher", e); } http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 82ea635..447bc97 100644 --- a/pom.xml +++ b/pom.xml @@ -75,6 +75,7 @@ <axiom.version>1.2.8</axiom.version> <surefire.version>2.12</surefire.version> <junit.version>4.7</junit.version> + <curator.version>2.7.1</curator.version> <jcr.version>2.0</jcr.version> <xmlbeans.version>2.5.0</xmlbeans.version> <xpp3.version>1.1.6</xpp3.version> @@ -97,6 +98,7 @@ <mysql.connector.version>5.1.31</mysql.connector.version> <skipTests>false</skipTests> <google.gson.version>2.3</google.gson.version> + <zk.version>3.4.0</zk.version> </properties> <developers> @@ -408,6 +410,11 @@ <artifactId>gson</artifactId> <version>${google.gson.version}</version> </dependency> + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <version>${zk.version}</version> + </dependency> </dependencies> </dependencyManagement>
