Author: lahiru
Date: Wed Jan 8 20:41:13 2014
New Revision: 1556631
URL: http://svn.apache.org/r1556631
Log:
adding jobcreation and jobaccepting logic.
Modified:
airavata/sandbox/orchestrator/orchestrator-core/pom.xml
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/JobSubmitterWorker.java
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/Orchestrator.java
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/PullBasedOrchestrator.java
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
Modified: airavata/sandbox/orchestrator/orchestrator-core/pom.xml
URL:
http://svn.apache.org/viewvc/airavata/sandbox/orchestrator/orchestrator-core/pom.xml?rev=1556631&r1=1556630&r2=1556631&view=diff
==============================================================================
--- airavata/sandbox/orchestrator/orchestrator-core/pom.xml (original)
+++ airavata/sandbox/orchestrator/orchestrator-core/pom.xml Wed Jan 8 20:41:13
2014
@@ -36,6 +36,11 @@ the License. -->
</dependency>
<dependency>
<groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-jpa-registry</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
<artifactId>airavata-common-utils</artifactId>
<version>${project.version}</version>
</dependency>
Modified:
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/JobSubmitterWorker.java
URL:
http://svn.apache.org/viewvc/airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/JobSubmitterWorker.java?rev=1556631&r1=1556630&r2=1556631&view=diff
==============================================================================
---
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/JobSubmitterWorker.java
(original)
+++
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/JobSubmitterWorker.java
Wed Jan 8 20:41:13 2014
@@ -25,11 +25,13 @@ import org.apache.airavata.orchestrator.
import org.apache.airavata.orchestrator.core.gfac.GFACInstance;
import org.apache.airavata.orchestrator.core.job.JobSubmitter;
import org.apache.airavata.orchestrator.core.utils.OrchestratorConstants;
+import org.apache.airavata.registry.api.exception.RegistryException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URL;
+import java.util.List;
import java.util.Properties;
public class JobSubmitterWorker implements Runnable {
@@ -64,6 +66,7 @@ public class JobSubmitterWorker implemen
public void run() {
/* implement logic to submit job batches time to time */
+ int idleCount = 0;
while (true) {
try {
Thread.sleep(submitInterval);
@@ -75,7 +78,34 @@ public class JobSubmitterWorker implemen
GFAC instance, we do not handle job by job submission to each
gfac instance
*/
GFACInstance gfacInstance =
jobSubmitter.selectGFACInstance(orchestratorContext);
- jobSubmitter.submitJob(gfacInstance);
+
+ // Now we have picked a gfac instance to submit set of jobs at
this time, now its time to
+ // select what are the jobs available to submit
+
+ try {
+ List<String> allAcceptedJobs =
orchestratorContext.getRegistry().getAllAcceptedJobs();
+ List<String> allHangedJobs =
orchestratorContext.getRegistry().getAllHangedJobs();
+ if (allAcceptedJobs.size() == 0) {
+ idleCount++;
+
+ if (idleCount == 10) {
+ try {
+ Thread.sleep(submitInterval*2);
+ } catch (InterruptedException e) {
+ logger.error("Error in JobSubmitter during
sleeping process before submit jobs");
+ e.printStackTrace();
+ }
+ idleCount=0;
+ }
+ continue;
+ }
+ jobSubmitter.submitJob(gfacInstance,allAcceptedJobs);
+
+ /* After submitting available jobs try to schedule again and
then submit*/
+
jobSubmitter.submitJob(jobSubmitter.selectGFACInstance(orchestratorContext),allHangedJobs);
+ } catch (RegistryException e) {
+ logger.error("Error while trying to retrieve available ");
+ }
}
}
Modified:
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/Orchestrator.java
URL:
http://svn.apache.org/viewvc/airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/Orchestrator.java?rev=1556631&r1=1556630&r2=1556631&view=diff
==============================================================================
---
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/Orchestrator.java
(original)
+++
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/Orchestrator.java
Wed Jan 8 20:41:13 2014
@@ -28,31 +28,47 @@ import org.apache.airavata.orchestrator.
*/
public interface Orchestrator {
- /** This method will initialize the Orchestrator, during restart this will
+
+ /**
+ * This method will initialize the Orchestrator, during restart this will
* get called and do init tasks
* @return
+ * @throws OrchestratorException
*/
boolean initialize() throws OrchestratorException;
+
+
/**
- *
+ * This method is the very first method which create an entry in
+ * database for a given experiment, this return the experiment ID, so
+ * user have full control for the experiment
+ * @param request
* @return
+ * @throws OrchestratorException
*/
- String createExperiment(ExperimentRequest request)throws
OrchestratorException;
+ String createExperiment(ExperimentRequest request) throws
OrchestratorException;
/**
- *
+ * After creating the experiment user has the experimentID, then user
+ * can create the JobRequest and send the Job input parameters to
Orchestrator
+ * @param request
* @return
+ * @throws OrchestratorException
*/
- boolean acceptExperiment(JobRequest request)throws OrchestratorException;
+ boolean acceptExperiment(JobRequest request) throws OrchestratorException;
/**
- *
+ * This is like a cron job which runs continuously and take available jobs
to
+ * submit to GFAC and submit them to GFAC
+ * @throws OrchestratorException
*/
- void startJobSubmitter()throws OrchestratorException;
+ void startJobSubmitter() throws OrchestratorException;
- /*
- This method will get called during graceful shutdown of Orchestrator
- This can be used to handle the shutdown of orchestrator gracefully.
+ /**
+ * This method will get called during graceful shutdown of Orchestrator
+ * This can be used to handle the shutdown of orchestrator gracefully.
+ * @return
+ * @throws OrchestratorException
*/
- boolean shutdown()throws OrchestratorException;
+ void shutdown() throws OrchestratorException;
}
Modified:
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/PullBasedOrchestrator.java
URL:
http://svn.apache.org/viewvc/airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/PullBasedOrchestrator.java?rev=1556631&r1=1556630&r2=1556631&view=diff
==============================================================================
---
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/PullBasedOrchestrator.java
(original)
+++
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/PullBasedOrchestrator.java
Wed Jan 8 20:41:13 2014
@@ -21,11 +21,13 @@
package org.apache.airavata.orchestrator.core;
import org.apache.airavata.common.exception.AiravataConfigurationException;
+import org.apache.airavata.common.utils.AiravataJobState;
import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
import org.apache.airavata.orchestrator.core.gfac.GFACInstance;
import org.apache.airavata.orchestrator.core.utils.OrchestratorConstants;
import org.apache.airavata.orchestrator.core.utils.OrchestratorUtils;
+import org.apache.airavata.persistance.registry.jpa.impl.AiravataJPARegistry;
import org.apache.airavata.registry.api.*;
import org.apache.airavata.registry.api.exception.RegistryException;
import org.slf4j.Logger;
@@ -43,9 +45,9 @@ public class PullBasedOrchestrator imple
OrchestratorContext orchestratorContext;
- OrchestratorRegistry airavataRegistry;
+ AiravataRegistry2 airavataRegistry;
- Executor executor;
+ ExecutorService executor;
public boolean initialize() throws OrchestratorException {
try {
@@ -94,8 +96,9 @@ public class PullBasedOrchestrator imple
}
- public boolean shutdown() throws OrchestratorException {
- return false; //To change body of implemented methods use File |
Settings | File Templates.
+ public void shutdown() throws OrchestratorException {
+ executor.shutdown();
+
}
//todo decide whether to return an error or do what
@@ -105,6 +108,7 @@ public class PullBasedOrchestrator imple
String username = request.getUserName();
try {
airavataRegistry.storeExperiment(username, experimentID);
+ airavataRegistry.changeStatus(username, experimentID,
AiravataJobState.State.CREATED);
} catch (RegistryException e) {
//todo put more meaningful error message
logger.error("Failed to create experiment for the request from " +
request.getUserName());
@@ -122,7 +126,7 @@ public class PullBasedOrchestrator imple
String experimentID = request.getExperimentID();
String username = request.getUserName();
try {
- airavataRegistry.storeExperiment(username, experimentID);
+ airavataRegistry.changeStatus(username, experimentID,
AiravataJobState.State.ACCEPTED);
} catch (RegistryException e) {
//todo put more meaningful error message
logger.error("Failed to create experiment for the request from " +
request.getUserName());
Modified:
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
URL:
http://svn.apache.org/viewvc/airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java?rev=1556631&r1=1556630&r2=1556631&view=diff
==============================================================================
---
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
(original)
+++
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
Wed Jan 8 20:41:13 2014
@@ -22,6 +22,8 @@ package org.apache.airavata.orchestrator
import org.apache.airavata.orchestrator.core.OrchestratorConfiguration;
import org.apache.airavata.orchestrator.core.gfac.GFACInstance;
+import org.apache.airavata.registry.api.AiravataRegistry2;
+import org.apache.airavata.registry.api.OrchestratorRegistry;
import java.util.ArrayList;
import java.util.List;
@@ -31,6 +33,8 @@ public class OrchestratorContext {
private OrchestratorConfiguration orchestratorConfiguration;
+ private AiravataRegistry2 registry;
+
public OrchestratorContext(List<GFACInstance> gfacInstanceList) {
this.gfacInstanceList = new ArrayList<GFACInstance>();
}
@@ -50,4 +54,12 @@ public class OrchestratorContext {
public void setOrchestratorConfiguration(OrchestratorConfiguration
orchestratorConfiguration) {
this.orchestratorConfiguration = orchestratorConfiguration;
}
+
+ public OrchestratorRegistry getRegistry() {
+ return registry;
+ }
+
+ public void setRegistry(AiravataRegistry2 registry) {
+ this.registry = registry;
+ }
}
Modified:
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
URL:
http://svn.apache.org/viewvc/airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java?rev=1556631&r1=1556630&r2=1556631&view=diff
==============================================================================
---
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
(original)
+++
airavata/sandbox/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
Wed Jan 8 20:41:13 2014
@@ -23,6 +23,8 @@ package org.apache.airavata.orchestrator
import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
import org.apache.airavata.orchestrator.core.gfac.GFACInstance;
+import java.util.List;
+
public interface JobSubmitter {
/**
@@ -35,5 +37,5 @@ public interface JobSubmitter {
* @param gfac
* @return
*/
- boolean submitJob(GFACInstance gfac);
+ boolean submitJob(GFACInstance gfac,List<String> experimentIDList);
}