Author: lahiru
Date: Wed Jan  8 20:41:13 2014
New Revision: 1556631

URL: http://svn.apache.org/r1556631
Log:
adding jobcreation and jobaccepting logic.

Modified:
    airavata/sandbox/orchestrator/orchestrator-core/pom.xml
    
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/JobSubmitterWorker.java
    
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/Orchestrator.java
    
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/PullBasedOrchestrator.java
    
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
    
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java

Modified: airavata/sandbox/orchestrator/orchestrator-core/pom.xml
URL: 
http://svn.apache.org/viewvc/airavata/sandbox/orchestrator/orchestrator-core/pom.xml?rev=1556631&r1=1556630&r2=1556631&view=diff
==============================================================================
--- airavata/sandbox/orchestrator/orchestrator-core/pom.xml (original)
+++ airavata/sandbox/orchestrator/orchestrator-core/pom.xml Wed Jan  8 20:41:13 
2014
@@ -36,6 +36,11 @@ the License. -->
         </dependency>
         <dependency>
             <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-jpa-registry</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
             <artifactId>airavata-common-utils</artifactId>
             <version>${project.version}</version>
         </dependency>

Modified: 
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/JobSubmitterWorker.java
URL: 
http://svn.apache.org/viewvc/airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/JobSubmitterWorker.java?rev=1556631&r1=1556630&r2=1556631&view=diff
==============================================================================
--- 
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/JobSubmitterWorker.java
 (original)
+++ 
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/JobSubmitterWorker.java
 Wed Jan  8 20:41:13 2014
@@ -25,11 +25,13 @@ import org.apache.airavata.orchestrator.
 import org.apache.airavata.orchestrator.core.gfac.GFACInstance;
 import org.apache.airavata.orchestrator.core.job.JobSubmitter;
 import org.apache.airavata.orchestrator.core.utils.OrchestratorConstants;
+import org.apache.airavata.registry.api.exception.RegistryException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URL;
+import java.util.List;
 import java.util.Properties;
 
 public class JobSubmitterWorker implements Runnable {
@@ -64,6 +66,7 @@ public class JobSubmitterWorker implemen
 
     public void run() {
         /* implement logic to submit job batches time to time */
+        int idleCount = 0;
         while (true) {
             try {
                 Thread.sleep(submitInterval);
@@ -75,7 +78,34 @@ public class JobSubmitterWorker implemen
               GFAC instance, we do not handle job by job submission to each 
gfac instance
             */
             GFACInstance gfacInstance = 
jobSubmitter.selectGFACInstance(orchestratorContext);
-            jobSubmitter.submitJob(gfacInstance);
+
+            // Now we have picked a gfac instance to submit set of jobs at 
this time, now its time to
+            // select what are the jobs available to submit
+
+            try {
+                List<String> allAcceptedJobs = 
orchestratorContext.getRegistry().getAllAcceptedJobs();
+                List<String> allHangedJobs = 
orchestratorContext.getRegistry().getAllHangedJobs();
+                if (allAcceptedJobs.size() == 0) {
+                    idleCount++;
+
+                    if (idleCount == 10) {
+                        try {
+                            Thread.sleep(submitInterval*2);
+                        } catch (InterruptedException e) {
+                            logger.error("Error in JobSubmitter during 
sleeping process before submit jobs");
+                            e.printStackTrace();
+                        }
+                        idleCount=0;
+                    }
+                    continue;
+                }
+                jobSubmitter.submitJob(gfacInstance,allAcceptedJobs);
+
+                /* After submitting available jobs try to schedule again and 
then submit*/
+                
jobSubmitter.submitJob(jobSubmitter.selectGFACInstance(orchestratorContext),allHangedJobs);
+            } catch (RegistryException e) {
+                logger.error("Error while trying to retrieve available ");
+            }
         }
     }
 

Modified: 
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/Orchestrator.java
URL: 
http://svn.apache.org/viewvc/airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/Orchestrator.java?rev=1556631&r1=1556630&r2=1556631&view=diff
==============================================================================
--- 
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/Orchestrator.java
 (original)
+++ 
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/Orchestrator.java
 Wed Jan  8 20:41:13 2014
@@ -28,31 +28,47 @@ import org.apache.airavata.orchestrator.
 */
 public interface Orchestrator {
 
-    /** This method will initialize the Orchestrator, during restart this will
+
+    /**
+     * This method will initialize the Orchestrator, during restart this will
      * get called and do init tasks
      * @return
+     * @throws OrchestratorException
      */
     boolean initialize() throws OrchestratorException;
+
+
     /**
-     *
+     *  This method is the very first method which create an entry in
+     * database for a given experiment, this return the experiment ID, so
+     * user have full control for the experiment
+     * @param request
      * @return
+     * @throws OrchestratorException
      */
-    String createExperiment(ExperimentRequest request)throws 
OrchestratorException;
+    String createExperiment(ExperimentRequest request) throws 
OrchestratorException;
 
     /**
-     *
+     * After creating the experiment user has the experimentID, then user
+     * can create the JobRequest and send the Job input parameters to 
Orchestrator
+     * @param request
      * @return
+     * @throws OrchestratorException
      */
-    boolean acceptExperiment(JobRequest request)throws OrchestratorException;
+    boolean acceptExperiment(JobRequest request) throws OrchestratorException;
 
     /**
-     *
+     * This is like a cron job which runs continuously and take available jobs 
to
+     * submit to GFAC and submit them to GFAC
+     * @throws OrchestratorException
      */
-    void startJobSubmitter()throws OrchestratorException;
+    void startJobSubmitter() throws OrchestratorException;
 
-    /*
-    This method will get called during graceful shutdown of Orchestrator
-    This can be used to handle the shutdown of orchestrator gracefully.
+    /**
+     * This method will get called during graceful shutdown of Orchestrator
+     * This can be used to handle the shutdown of orchestrator gracefully.
+     * @return
+     * @throws OrchestratorException
      */
-    boolean shutdown()throws OrchestratorException;
+    void shutdown() throws OrchestratorException;
 }

Modified: 
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/PullBasedOrchestrator.java
URL: 
http://svn.apache.org/viewvc/airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/PullBasedOrchestrator.java?rev=1556631&r1=1556630&r2=1556631&view=diff
==============================================================================
--- 
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/PullBasedOrchestrator.java
 (original)
+++ 
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/PullBasedOrchestrator.java
 Wed Jan  8 20:41:13 2014
@@ -21,11 +21,13 @@
 package org.apache.airavata.orchestrator.core;
 
 import org.apache.airavata.common.exception.AiravataConfigurationException;
+import org.apache.airavata.common.utils.AiravataJobState;
 import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
 import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
 import org.apache.airavata.orchestrator.core.gfac.GFACInstance;
 import org.apache.airavata.orchestrator.core.utils.OrchestratorConstants;
 import org.apache.airavata.orchestrator.core.utils.OrchestratorUtils;
+import org.apache.airavata.persistance.registry.jpa.impl.AiravataJPARegistry;
 import org.apache.airavata.registry.api.*;
 import org.apache.airavata.registry.api.exception.RegistryException;
 import org.slf4j.Logger;
@@ -43,9 +45,9 @@ public class PullBasedOrchestrator imple
 
     OrchestratorContext orchestratorContext;
 
-    OrchestratorRegistry airavataRegistry;
+    AiravataRegistry2 airavataRegistry;
 
-    Executor executor;
+    ExecutorService executor;
 
     public boolean initialize() throws OrchestratorException {
         try {
@@ -94,8 +96,9 @@ public class PullBasedOrchestrator imple
     }
 
 
-    public boolean shutdown() throws OrchestratorException {
-        return false;  //To change body of implemented methods use File | 
Settings | File Templates.
+    public void shutdown() throws OrchestratorException {
+        executor.shutdown();
+
     }
 
     //todo decide whether to return an error or do what
@@ -105,6 +108,7 @@ public class PullBasedOrchestrator imple
         String username = request.getUserName();
         try {
             airavataRegistry.storeExperiment(username, experimentID);
+            airavataRegistry.changeStatus(username, experimentID, 
AiravataJobState.State.CREATED);
         } catch (RegistryException e) {
             //todo put more meaningful error message
             logger.error("Failed to create experiment for the request from " + 
request.getUserName());
@@ -122,7 +126,7 @@ public class PullBasedOrchestrator imple
         String experimentID = request.getExperimentID();
         String username = request.getUserName();
         try {
-            airavataRegistry.storeExperiment(username, experimentID);
+            airavataRegistry.changeStatus(username, experimentID, 
AiravataJobState.State.ACCEPTED);
         } catch (RegistryException e) {
             //todo put more meaningful error message
             logger.error("Failed to create experiment for the request from " + 
request.getUserName());

Modified: 
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
URL: 
http://svn.apache.org/viewvc/airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java?rev=1556631&r1=1556630&r2=1556631&view=diff
==============================================================================
--- 
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
 (original)
+++ 
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
 Wed Jan  8 20:41:13 2014
@@ -22,6 +22,8 @@ package org.apache.airavata.orchestrator
 
 import org.apache.airavata.orchestrator.core.OrchestratorConfiguration;
 import org.apache.airavata.orchestrator.core.gfac.GFACInstance;
+import org.apache.airavata.registry.api.AiravataRegistry2;
+import org.apache.airavata.registry.api.OrchestratorRegistry;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -31,6 +33,8 @@ public class OrchestratorContext {
 
     private OrchestratorConfiguration orchestratorConfiguration;
 
+    private AiravataRegistry2 registry;
+
     public OrchestratorContext(List<GFACInstance> gfacInstanceList) {
         this.gfacInstanceList = new ArrayList<GFACInstance>();
     }
@@ -50,4 +54,12 @@ public class OrchestratorContext {
     public void setOrchestratorConfiguration(OrchestratorConfiguration 
orchestratorConfiguration) {
         this.orchestratorConfiguration = orchestratorConfiguration;
     }
+
+    public OrchestratorRegistry getRegistry() {
+        return registry;
+    }
+
+    public void setRegistry(AiravataRegistry2 registry) {
+        this.registry = registry;
+    }
 }

Modified: 
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
URL: 
http://svn.apache.org/viewvc/airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java?rev=1556631&r1=1556630&r2=1556631&view=diff
==============================================================================
--- 
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
 (original)
+++ 
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
 Wed Jan  8 20:41:13 2014
@@ -23,6 +23,8 @@ package org.apache.airavata.orchestrator
 import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
 import org.apache.airavata.orchestrator.core.gfac.GFACInstance;
 
+import java.util.List;
+
 public interface JobSubmitter {
 
     /**
@@ -35,5 +37,5 @@ public interface JobSubmitter {
      * @param gfac
      * @return
      */
-    boolean submitJob(GFACInstance gfac);
+    boolean submitJob(GFACInstance gfac,List<String> experimentIDList);
 }


Reply via email to