Repository: airavata
Updated Branches:
  refs/heads/master c05d5e6c8 -> aa27ce109


http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
----------------------------------------------------------------------
diff --git 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
new file mode 100644
index 0000000..8029a0c
--- /dev/null
+++ 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.messaging.core.impl;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.messaging.event.*;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RabbitMQTaskLaunchPublisher implements Publisher{
+    private final static Logger log = 
LoggerFactory.getLogger(RabbitMQTaskLaunchPublisher.class);
+    public static final String LAUNCH_TASK = "launch.task";
+    public static final String TERMINATE_TASK = "teminate.task";
+
+    private RabbitMQProducer rabbitMQProducer;
+
+    public RabbitMQTaskLaunchPublisher() throws Exception {
+        String brokerUrl;
+        String exchangeName;
+        try {
+            brokerUrl = 
ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+            exchangeName = 
ServerSettings.getSetting(MessagingConstants.RABBITMQ_EXCHANGE_NAME);
+        } catch (ApplicationSettingsException e) {
+            String message = "Failed to get read the required properties from 
airavata to initialize rabbitmq";
+            log.error(message, e);
+            throw new AiravataException(message, e);
+        }
+        rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName);
+        rabbitMQProducer.open();
+    }
+
+    public void publish(MessageContext msgCtx) throws AiravataException {
+        try {
+            log.info("Publishing to lauch queue ...");
+            byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
+            Message message = new Message();
+            message.setEvent(body);
+            message.setMessageId(msgCtx.getMessageId());
+            message.setMessageType(msgCtx.getType());
+            message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
+            String routingKey = null;
+            if (msgCtx.getType().equals(MessageType.LAUNCHTASK)){
+                TaskSubmitEvent event = (TaskSubmitEvent) msgCtx.getEvent();
+                routingKey = LAUNCH_TASK + "."+event.getExperimentId() + "." +
+                        event.getTaskId() + "." + event.getGatewayId();
+            }else if(msgCtx.getType().equals(MessageType.TERMINATETASK)){
+                TaskTerminateEvent event = (TaskTerminateEvent) 
msgCtx.getEvent();
+                routingKey = TERMINATE_TASK + "."+event.getExperimentId() + 
"." +
+                        event.getTaskId();
+            }
+            byte[] messageBody = ThriftUtils.serializeThriftObject(message);
+            rabbitMQProducer.send(messageBody, routingKey);
+        } catch (TException e) {
+            String msg = "Error while deserializing the object";
+            log.error(msg, e);
+            throw new AiravataException(msg, e);
+        } catch (Exception e) {
+            String msg = "Error while sending to rabbitmq";
+            log.error(msg, e);
+            throw new AiravataException(msg, e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
 
b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index e88945d..b200468 100644
--- 
a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ 
b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -103,7 +103,7 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface,
                // registering with zk
                try {
                        if (ServerSettings.isRabbitMqPublishEnabled()) {
-                       publisher = PublisherFactory.createPublisher();
+                       publisher = PublisherFactory.createActivityPublisher();
                }
                        String zkhostPort = AiravataZKUtils.getZKhostPort();
                        String airavataServerHostPort = ServerSettings
@@ -156,6 +156,7 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface,
                        registry = RegistryFactory.getDefaultRegistry();
                        orchestrator.initialize();
                        orchestrator.getOrchestratorContext().setZk(this.zk);
+                       
orchestrator.getOrchestratorContext().setPublisher(this.publisher);
                } catch (OrchestratorException e) {
             log.error(e.getMessage(), e);
             throw new OrchestratorException("Error while initializing 
orchestrator service", e);

http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/modules/orchestrator/orchestrator-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/pom.xml 
b/modules/orchestrator/orchestrator-core/pom.xml
index 0dffaee..23863fb 100644
--- a/modules/orchestrator/orchestrator-core/pom.xml
+++ b/modules/orchestrator/orchestrator-core/pom.xml
@@ -121,11 +121,21 @@ the License. -->
             <artifactId>airavata-server-configuration</artifactId>
             <scope>test</scope>
         </dependency>
-        <!-- zookeeper dependencies -->
+        <!-- zookeeper and curator dependencies -->
         <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
-               <version>3.4.0</version>
+               <version>${zk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-recipes</artifactId>
+            <version>${curator.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <version>${curator.version}</version>
         </dependency>
     </dependencies>
     <build>

http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
index 5d465ff..b77087b 100644
--- 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
+++ 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.airavata.gfac.client.GFACInstance;
+import org.apache.airavata.messaging.core.Publisher;
 import org.apache.airavata.orchestrator.core.OrchestratorConfiguration;
 import org.apache.airavata.registry.cpi.Registry;
 import org.apache.zookeeper.ZooKeeper;
@@ -39,6 +40,8 @@ public class OrchestratorContext {
     private Registry newRegistry;
 
     private static ZooKeeper zk; // this instance can be accessed by the 
Validators and other components
+
+    private Publisher publisher;
     
     public OrchestratorContext(List<GFACInstance> gfacInstanceList) {
         if (gfacInstanceList != null) {
@@ -48,6 +51,14 @@ public class OrchestratorContext {
         }
     }
 
+    public Publisher getPublisher() {
+        return publisher;
+    }
+
+    public void setPublisher(Publisher publisher) {
+        this.publisher = publisher;
+    }
+
     public OrchestratorContext() {
         this(null);
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
new file mode 100644
index 0000000..58ac982
--- /dev/null
+++ 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
@@ -0,0 +1,247 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.orchestrator.core.impl;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataZKUtils;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.gfac.client.GFACInstance;
+import org.apache.airavata.gfac.client.GFacClientFactory;
+import org.apache.airavata.gfac.core.utils.GFacUtils;
+import org.apache.airavata.gfac.cpi.GfacService;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.messaging.core.PublisherFactory;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.TaskSubmitEvent;
+import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
+import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
+import org.apache.airavata.orchestrator.core.job.JobSubmitter;
+import org.apache.thrift.TException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+/**
+ * This class can be used to do the communication between orchestrator and 
gfac to handle using a queue
+ */
+public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
+    private final static Logger logger = 
LoggerFactory.getLogger(GFACPassiveJobSubmitter.class);
+
+    public static final String IP = "ip";
+
+    private OrchestratorContext orchestratorContext;
+
+    private static Integer mutex = -1;
+
+    private Publisher publisher;
+
+    public void initialize(OrchestratorContext orchestratorContext) throws 
OrchestratorException {
+        this.orchestratorContext = orchestratorContext;
+        if(orchestratorContext.getPublisher()!=null){ // use the same 
publisher this will be empty if rabbitmq.publish is not enabled in the 
configuraiton
+            this.publisher = orchestratorContext.getPublisher();
+        }else {
+            try {
+                this.publisher = PublisherFactory.createTaskLaunchPublisher();
+            } catch (AiravataException e) {
+                logger.error(e.getMessage(), e);
+                throw new OrchestratorException("Cannot initialize " + 
GFACPassiveJobSubmitter.class + " need to start Rabbitmq server to use " + 
GFACPassiveJobSubmitter.class);
+            }
+        }
+    }
+
+    public GFACInstance selectGFACInstance() throws OrchestratorException {
+        // currently we only support one instance but future we have to pick an
+        // instance
+        return null;
+    }
+
+    public boolean submit(String experimentID, String taskID) throws 
OrchestratorException {
+        return submit(experimentID, taskID, null);
+    }
+
+    /**
+     * Submit the job to a shared launch.queue accross multiple gfac instances
+     *
+     * @param experimentID
+     * @param taskID
+     * @param tokenId
+     * @return
+     * @throws OrchestratorException
+     */
+    public boolean submit(String experimentID, String taskID, String tokenId) 
throws OrchestratorException {
+
+        ZooKeeper zk = orchestratorContext.getZk();
+        GfacService.Client gfacClient = null;
+        try {
+            if (zk == null || !zk.getState().isConnected()) {
+                String zkhostPort = AiravataZKUtils.getZKhostPort();
+                zk = new ZooKeeper(zkhostPort, 6000, this);
+                synchronized (mutex) {
+                    mutex.wait();
+                }
+            }
+            String gfacServer = 
ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
+            String experimentNode = 
ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, 
"/gfac-experiments");
+            List<String> children = zk.getChildren(gfacServer, this);
+
+            if (children.size() == 0) {
+                // Zookeeper data need cleaning
+                throw new OrchestratorException("There is no active GFac 
instance to route the request");
+            } else {
+                String pickedChild = children.get(new 
Random().nextInt(Integer.MAX_VALUE) % children.size());
+                // here we are not using an index because the getChildren does 
not return the same order everytime
+                String gfacNodeData = new String(zk.getData(gfacServer + 
File.separator + pickedChild, false, null));
+                logger.info("GFAC instance node data: " + gfacNodeData);
+                String[] split = gfacNodeData.split(":");
+                gfacClient = GFacClientFactory.createGFacClient(split[0], 
Integer.parseInt(split[1]));
+                if (zk.exists(gfacServer + File.separator + pickedChild, 
false) != null) {
+                    // before submitting the job we check again the state of 
the node
+                    if (GFacUtils.createExperimentEntry(experimentID, taskID, 
zk, experimentNode, pickedChild, tokenId)) {
+                        String gatewayId = null;
+                        CredentialReader credentialReader = 
GFacUtils.getCredentialReader();
+                        if (credentialReader != null) {
+                            try {
+                                gatewayId = 
credentialReader.getGatewayID(tokenId);
+                            } catch (Exception e) {
+                                logger.error(e.getLocalizedMessage());
+                            }
+                        }
+                        if(gatewayId == null || gatewayId.isEmpty()){
+                            gatewayId = ServerSettings.getDefaultUserGateway();
+                        }
+                        TaskSubmitEvent taskSubmitEvent = new 
TaskSubmitEvent(experimentID, taskID, gatewayId);
+                        MessageContext messageContext = new 
MessageContext(taskSubmitEvent, MessageType.LAUNCHTASK,"LAUNCH.TASK-"+ 
UUID.randomUUID().toString(),gatewayId);
+                        publisher.publish(messageContext);
+                    }
+                }
+            }
+        } catch (InterruptedException e) {
+            logger.error(e.getMessage(), e);
+            throw new OrchestratorException(e);
+        } catch (KeeperException e) {
+            logger.error(e.getMessage(), e);
+            throw new OrchestratorException(e);
+        } catch (ApplicationSettingsException e) {
+            logger.error(e.getMessage(), e);
+            throw new OrchestratorException(e);
+        } catch (IOException e) {
+            logger.error(e.getMessage(), e);
+            throw new OrchestratorException(e);
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            throw new OrchestratorException(e);
+        }finally {
+            gfacClient.getOutputProtocol().getTransport().close();
+        }
+        return true;
+
+    }
+
+    /**
+     * Submit the experiment the terminate.queue job queue and remove the 
experiment from shared launch.queue
+     * @param experimentID
+     * @param taskID
+     * @return
+     * @throws OrchestratorException
+     */
+    public boolean terminate(String experimentID, String taskID) throws 
OrchestratorException {
+        ZooKeeper zk = orchestratorContext.getZk();
+        GfacService.Client localhost = null;
+        try {
+            if (zk == null || !zk.getState().isConnected()) {
+                String zkhostPort = AiravataZKUtils.getZKhostPort();
+                zk = new ZooKeeper(zkhostPort, 6000, this);
+                synchronized (mutex) {
+                    mutex.wait();
+                }
+            }
+            String gfacServer = 
ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
+            String experimentNode = 
ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, 
"/gfac-experiments");
+            List<String> children = zk.getChildren(gfacServer, this);
+
+            if (children.size() == 0) {
+                // Zookeeper data need cleaning
+                throw new OrchestratorException("There is no active GFac 
instance to route the request");
+            } else {
+                String pickedChild = children.get(new 
Random().nextInt(Integer.MAX_VALUE) % children.size());
+                // here we are not using an index because the getChildren does 
not return the same order everytime
+                String gfacNodeData = new String(zk.getData(gfacServer + 
File.separator + pickedChild, false, null));
+                logger.info("GFAC instance node data: " + gfacNodeData);
+                String[] split = gfacNodeData.split(":");
+                localhost = GFacClientFactory.createGFacClient(split[0], 
Integer.parseInt(split[1]));
+                if (zk.exists(gfacServer + File.separator + pickedChild, 
false) != null) {
+                    // before submitting the job we check again the state of 
the node
+                    if (GFacUtils.createExperimentEntry(experimentID, taskID, 
zk, experimentNode, pickedChild, null)) {
+                        TaskSubmitEvent taskSubmitEvent = new 
TaskSubmitEvent(experimentID, taskID, null);
+                        MessageContext messageContext = new 
MessageContext(taskSubmitEvent, MessageType.LAUNCHTASK,"LAUNCH.TERMINATE-"+ 
UUID.randomUUID().toString(),null);
+                        publisher.publish(messageContext);
+                    }
+                }
+            }
+        } catch (InterruptedException e) {
+            logger.error(e.getMessage(), e);
+            throw new OrchestratorException(e);
+        } catch (KeeperException e) {
+            logger.error(e.getMessage(), e);
+            throw new OrchestratorException(e);
+        } catch (ApplicationSettingsException e) {
+            logger.error(e.getMessage(), e);
+            throw new OrchestratorException(e);
+        } catch (IOException e) {
+            logger.error(e.getMessage(), e);
+            throw new OrchestratorException(e);
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            throw new OrchestratorException(e);
+        }finally {
+
+        }
+        return false;
+
+    }
+
+    synchronized public void process(WatchedEvent event) {
+        synchronized (mutex) {
+            switch (event.getState()) {
+                case SyncConnected:
+                    mutex.notify();
+            }
+            switch (event.getType()) {
+                case NodeCreated:
+                    mutex.notify();
+                    break;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java
 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java
new file mode 100644
index 0000000..54339a2
--- /dev/null
+++ 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java
@@ -0,0 +1,212 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.orchestrator.core.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataZKUtils;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.gfac.client.GFACInstance;
+import org.apache.airavata.gfac.client.GFacClientFactory;
+import org.apache.airavata.gfac.core.utils.GFacUtils;
+import org.apache.airavata.gfac.cpi.GfacService;
+import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
+import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
+import org.apache.airavata.orchestrator.core.job.JobSubmitter;
+import org.apache.thrift.TException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * this class is responsible for submitting a job to gfac in service mode,
+ * it will select a gfac instance based on the incoming request and submit to 
that
+ * gfac instance.
+ */
+public class GFACRPCJobSubmitter implements JobSubmitter, Watcher {
+       private final static Logger logger = 
LoggerFactory.getLogger(GFACRPCJobSubmitter.class);
+       public static final String IP = "ip";
+
+       private OrchestratorContext orchestratorContext;
+
+       private static Integer mutex = -1;
+
+       public void initialize(OrchestratorContext orchestratorContext) throws 
OrchestratorException {
+               this.orchestratorContext = orchestratorContext;
+       }
+
+       public GFACInstance selectGFACInstance() throws OrchestratorException {
+               // currently we only support one instance but future we have to 
pick an
+               // instance
+               return null;
+       }
+
+       public boolean submit(String experimentID, String taskID) throws 
OrchestratorException {
+               return this.submit(experimentID, taskID, null);
+       }
+
+       public boolean submit(String experimentID, String taskID, String 
tokenId) throws OrchestratorException {
+               ZooKeeper zk = orchestratorContext.getZk();
+        GfacService.Client gfacClient = null;
+               try {
+                       if (zk == null || !zk.getState().isConnected()) {
+                               String zkhostPort = 
AiravataZKUtils.getZKhostPort();
+                               zk = new ZooKeeper(zkhostPort, 6000, this);
+                               synchronized (mutex) {
+                                       mutex.wait();
+                               }
+                       }
+                       String gfacServer = 
ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
+                       String experimentNode = 
ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, 
"/gfac-experiments");
+                       List<String> children = zk.getChildren(gfacServer, 
this);
+                       
+                       if (children.size() == 0) {
+                // Zookeeper data need cleaning
+                throw new OrchestratorException("There is no active GFac 
instance to route the request");
+            } else {
+                               String pickedChild = children.get(new 
Random().nextInt(Integer.MAX_VALUE) % children.size());
+                               // here we are not using an index because the 
getChildren does not return the same order everytime
+                               String gfacNodeData = new 
String(zk.getData(gfacServer + File.separator + pickedChild, false, null));
+                               logger.info("GFAC instance node data: " + 
gfacNodeData);
+                               String[] split = gfacNodeData.split(":");
+                               gfacClient = 
GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1]));
+                               if (zk.exists(gfacServer + File.separator + 
pickedChild, false) != null) {
+                                       // before submitting the job we check 
again the state of the node
+                                       if 
(GFacUtils.createExperimentEntry(experimentID, taskID, zk, experimentNode, 
pickedChild, tokenId)) {
+                                                String gatewayId = null;
+                        CredentialReader credentialReader = 
GFacUtils.getCredentialReader();
+                         if (credentialReader != null) {
+                             try {
+                                gatewayId = 
credentialReader.getGatewayID(tokenId);
+                             } catch (Exception e) {
+                                 logger.error(e.getLocalizedMessage());
+                             }
+                         }
+                        if(gatewayId == null || gatewayId.isEmpty()){
+                         gatewayId = ServerSettings.getDefaultUserGateway();
+                        }
+                                               return 
gfacClient.submitJob(experimentID, taskID, gatewayId);
+                                       }
+                               }
+                       }
+               } catch (TException e) {
+            logger.error(e.getMessage(), e);
+                       throw new OrchestratorException(e);
+               } catch (InterruptedException e) {
+            logger.error(e.getMessage(), e);
+            throw new OrchestratorException(e);
+               } catch (KeeperException e) {
+            logger.error(e.getMessage(), e);
+            throw new OrchestratorException(e);
+               } catch (ApplicationSettingsException e) {
+            logger.error(e.getMessage(), e);
+            throw new OrchestratorException(e);
+               } catch (IOException e) {
+            logger.error(e.getMessage(), e);
+            throw new OrchestratorException(e);
+               } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            throw new OrchestratorException(e);
+               }finally {
+            gfacClient.getOutputProtocol().getTransport().close();
+        }
+        return false;
+       }
+
+    public boolean terminate(String experimentID, String taskID) throws 
OrchestratorException {
+        ZooKeeper zk = orchestratorContext.getZk();
+        GfacService.Client localhost = null;
+        try {
+            if (zk == null || !zk.getState().isConnected()) {
+                String zkhostPort = AiravataZKUtils.getZKhostPort();
+                zk = new ZooKeeper(zkhostPort, 6000, this);
+                synchronized (mutex) {
+                    mutex.wait();
+                }
+            }
+            String gfacServer = 
ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
+            String experimentNode = 
ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, 
"/gfac-experiments");
+            List<String> children = zk.getChildren(gfacServer, this);
+
+            if (children.size() == 0) {
+                // Zookeeper data need cleaning
+                throw new OrchestratorException("There is no active GFac 
instance to route the request");
+            } else {
+                String pickedChild = children.get(new 
Random().nextInt(Integer.MAX_VALUE) % children.size());
+                // here we are not using an index because the getChildren does 
not return the same order everytime
+                String gfacNodeData = new String(zk.getData(gfacServer + 
File.separator + pickedChild, false, null));
+                logger.info("GFAC instance node data: " + gfacNodeData);
+                String[] split = gfacNodeData.split(":");
+                localhost = GFacClientFactory.createGFacClient(split[0], 
Integer.parseInt(split[1]));
+                if (zk.exists(gfacServer + File.separator + pickedChild, 
false) != null) {
+                    // before submitting the job we check again the state of 
the node
+                    if (GFacUtils.createExperimentEntry(experimentID, taskID, 
zk, experimentNode, pickedChild, null)) {
+                        return localhost.cancelJob(experimentID, taskID);
+                    }
+                }
+            }
+        } catch (TException e) {
+            logger.error(e.getMessage(), e);
+            throw new OrchestratorException(e);
+        } catch (InterruptedException e) {
+            logger.error(e.getMessage(), e);
+            throw new OrchestratorException(e);
+        } catch (KeeperException e) {
+            logger.error(e.getMessage(), e);
+            throw new OrchestratorException(e);
+        } catch (ApplicationSettingsException e) {
+            logger.error(e.getMessage(), e);
+            throw new OrchestratorException(e);
+        } catch (IOException e) {
+            logger.error(e.getMessage(), e);
+            throw new OrchestratorException(e);
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            throw new OrchestratorException(e);
+        }finally {
+
+        }
+        return false;
+    }
+
+    synchronized public void process(WatchedEvent event) {
+               synchronized (mutex) {
+                       switch (event.getState()) {
+                       case SyncConnected:
+                               mutex.notify();
+                       }
+                       switch (event.getType()) {
+                       case NodeCreated:
+                               mutex.notify();
+                               break;
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
deleted file mode 100644
index 3bbc588..0000000
--- 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.airavata.orchestrator.core.impl;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataZKUtils;
-import org.apache.airavata.common.utils.Constants;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.credential.store.store.CredentialReader;
-import org.apache.airavata.gfac.client.GFACInstance;
-import org.apache.airavata.gfac.client.GFacClientFactory;
-import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.gfac.cpi.GfacService;
-import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
-import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
-import org.apache.airavata.orchestrator.core.job.JobSubmitter;
-import org.apache.thrift.TException;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/*
- * this class is responsible for submitting a job to gfac in service mode,
- * it will select a gfac instance based on the incoming request and submit to 
that
- * gfac instance.
- */
-public class GFACServiceJobSubmitter implements JobSubmitter, Watcher {
-       private final static Logger logger = 
LoggerFactory.getLogger(GFACServiceJobSubmitter.class);
-       public static final String IP = "ip";
-
-       private OrchestratorContext orchestratorContext;
-
-       private static Integer mutex = -1;
-
-       public void initialize(OrchestratorContext orchestratorContext) throws 
OrchestratorException {
-               this.orchestratorContext = orchestratorContext;
-       }
-
-       public GFACInstance selectGFACInstance() throws OrchestratorException {
-               // currently we only support one instance but future we have to 
pick an
-               // instance
-               return null;
-       }
-
-       public boolean submit(String experimentID, String taskID) throws 
OrchestratorException {
-               return this.submit(experimentID, taskID, null);
-       }
-
-       public boolean submit(String experimentID, String taskID, String 
tokenId) throws OrchestratorException {
-               ZooKeeper zk = orchestratorContext.getZk();
-        GfacService.Client gfacClient = null;
-               try {
-                       if (zk == null || !zk.getState().isConnected()) {
-                               String zkhostPort = 
AiravataZKUtils.getZKhostPort();
-                               zk = new ZooKeeper(zkhostPort, 6000, this);
-                               synchronized (mutex) {
-                                       mutex.wait();
-                               }
-                       }
-                       String gfacServer = 
ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
-                       String experimentNode = 
ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, 
"/gfac-experiments");
-                       List<String> children = zk.getChildren(gfacServer, 
this);
-                       
-                       if (children.size() == 0) {
-                // Zookeeper data need cleaning
-                throw new OrchestratorException("There is no active GFac 
instance to route the request");
-            } else {
-                               String pickedChild = children.get(new 
Random().nextInt(Integer.MAX_VALUE) % children.size());
-                               // here we are not using an index because the 
getChildren does not return the same order everytime
-                               String gfacNodeData = new 
String(zk.getData(gfacServer + File.separator + pickedChild, false, null));
-                               logger.info("GFAC instance node data: " + 
gfacNodeData);
-                               String[] split = gfacNodeData.split(":");
-                               gfacClient = 
GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1]));
-                               if (zk.exists(gfacServer + File.separator + 
pickedChild, false) != null) {
-                                       // before submitting the job we check 
again the state of the node
-                                       if 
(GFacUtils.createExperimentEntry(experimentID, taskID, zk, experimentNode, 
pickedChild, tokenId)) {
-                                                String gatewayId = null;
-                        CredentialReader credentialReader = 
GFacUtils.getCredentialReader();
-                         if (credentialReader != null) {
-                             try {
-                                gatewayId = 
credentialReader.getGatewayID(tokenId);
-                             } catch (Exception e) {
-                                 logger.error(e.getLocalizedMessage());
-                             }
-                         }
-                        if(gatewayId == null || gatewayId.isEmpty()){
-                         gatewayId = ServerSettings.getDefaultUserGateway();
-                        }
-                                               return 
gfacClient.submitJob(experimentID, taskID, gatewayId);
-                                       }
-                               }
-                       }
-               } catch (TException e) {
-            logger.error(e.getMessage(), e);
-                       throw new OrchestratorException(e);
-               } catch (InterruptedException e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-               } catch (KeeperException e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-               } catch (ApplicationSettingsException e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-               } catch (IOException e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-               } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-               }finally {
-            gfacClient.getOutputProtocol().getTransport().close();
-        }
-        return false;
-       }
-
-    public boolean terminate(String experimentID, String taskID) throws 
OrchestratorException {
-        ZooKeeper zk = orchestratorContext.getZk();
-        GfacService.Client localhost = null;
-        try {
-            if (zk == null || !zk.getState().isConnected()) {
-                String zkhostPort = AiravataZKUtils.getZKhostPort();
-                zk = new ZooKeeper(zkhostPort, 6000, this);
-                synchronized (mutex) {
-                    mutex.wait();
-                }
-            }
-            String gfacServer = 
ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
-            String experimentNode = 
ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, 
"/gfac-experiments");
-            List<String> children = zk.getChildren(gfacServer, this);
-
-            if (children.size() == 0) {
-                // Zookeeper data need cleaning
-                throw new OrchestratorException("There is no active GFac 
instance to route the request");
-            } else {
-                String pickedChild = children.get(new 
Random().nextInt(Integer.MAX_VALUE) % children.size());
-                // here we are not using an index because the getChildren does 
not return the same order everytime
-                String gfacNodeData = new String(zk.getData(gfacServer + 
File.separator + pickedChild, false, null));
-                logger.info("GFAC instance node data: " + gfacNodeData);
-                String[] split = gfacNodeData.split(":");
-                localhost = GFacClientFactory.createGFacClient(split[0], 
Integer.parseInt(split[1]));
-                if (zk.exists(gfacServer + File.separator + pickedChild, 
false) != null) {
-                    // before submitting the job we check again the state of 
the node
-                    if (GFacUtils.createExperimentEntry(experimentID, taskID, 
zk, experimentNode, pickedChild, null)) {
-                        return localhost.cancelJob(experimentID, taskID);
-                    }
-                }
-            }
-        } catch (TException e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-        } catch (InterruptedException e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-        } catch (KeeperException e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-        } catch (ApplicationSettingsException e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-        } catch (IOException e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-        }finally {
-
-        }
-        return false;
-    }
-
-    synchronized public void process(WatchedEvent event) {
-               synchronized (mutex) {
-                       switch (event.getState()) {
-                       case SyncConnected:
-                               mutex.notify();
-                       }
-                       switch (event.getType()) {
-                       case NodeCreated:
-                               mutex.notify();
-                               break;
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/WorkflowEngineImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/WorkflowEngineImpl.java
 
b/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/WorkflowEngineImpl.java
index a8ee98b..0af8881 100644
--- 
a/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/WorkflowEngineImpl.java
+++ 
b/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/WorkflowEngineImpl.java
@@ -25,7 +25,7 @@ import org.airavata.appcatalog.cpi.WorkflowCatalog;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.messaging.core.impl.RabbitMQPublisher;
+import org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher;
 import org.apache.airavata.model.error.AiravataClientConnectException;
 import org.apache.airavata.model.workspace.experiment.Experiment;
 import org.apache.airavata.orchestrator.client.OrchestratorClientFactory;
@@ -47,7 +47,7 @@ public class WorkflowEngineImpl implements WorkflowEngine {
     private Publisher rabbitMQPublisher;
     WorkflowEngineImpl() {
         try {
-            rabbitMQPublisher = new RabbitMQPublisher();
+            rabbitMQPublisher = new RabbitMQStatusPublisher();
         } catch (Exception e) {
             logger.error("Failed to instantiate RabbitMQPublisher", e);
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 82ea635..447bc97 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,6 +75,7 @@
                <axiom.version>1.2.8</axiom.version>
                <surefire.version>2.12</surefire.version>
                <junit.version>4.7</junit.version>
+               <curator.version>2.7.1</curator.version>
                <jcr.version>2.0</jcr.version>
                <xmlbeans.version>2.5.0</xmlbeans.version>
                <xpp3.version>1.1.6</xpp3.version>
@@ -97,6 +98,7 @@
                <mysql.connector.version>5.1.31</mysql.connector.version>
                <skipTests>false</skipTests>
         <google.gson.version>2.3</google.gson.version>
+               <zk.version>3.4.0</zk.version>
        </properties>
 
        <developers>
@@ -408,6 +410,11 @@
                 <artifactId>gson</artifactId>
                 <version>${google.gson.version}</version>
             </dependency>
+                       <dependency>
+                               <groupId>org.apache.zookeeper</groupId>
+                               <artifactId>zookeeper</artifactId>
+                               <version>${zk.version}</version>
+                       </dependency>
                </dependencies>
        </dependencyManagement>
 

Reply via email to