Author: mattmann
Date: Sun Jun 19 18:25:41 2011
New Revision: 1137402
URL: http://svn.apache.org/viewvc?rev=1137402&view=rev
Log:
- fix for OODT-279 Make Resource Manager FAILURE and SUCCESS aware instead of
just COMPLETE aware
Modified:
oodt/trunk/CHANGES.txt
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/XmlRpcBatchMgr.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/XmlRpcBatchMgrProxy.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/jobrepo/MemoryJobRepository.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/jobrepo/XStreamJobRepository.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/JobStatus.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/system/extern/XmlRpcBatchStub.java
Modified: oodt/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/oodt/trunk/CHANGES.txt?rev=1137402&r1=1137401&r2=1137402&view=diff
==============================================================================
--- oodt/trunk/CHANGES.txt (original)
+++ oodt/trunk/CHANGES.txt Sun Jun 19 18:25:41 2011
@@ -4,7 +4,11 @@ Apache OODT Change Log
Release 0.3-SNAPSHOT (in progress)
--------------------------------------------
-* OODT-278 CAS-PGE returns success even if product file(s) fail to ingest
(mattmann, bfoster)
+* OODT-279 Make Resource Manager FAILURE and SUCCESS aware instead of just
+ COMPLETE aware (mattmann, bfoster)
+
+* OODT-278 CAS-PGE returns success even if product file(s) fail to
+ ingest (mattmann, bfoster)
* OODT-243 Add method is called on already existing jobspec (mattmann, bfoster)
Modified:
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/XmlRpcBatchMgr.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/XmlRpcBatchMgr.java?rev=1137402&r1=1137401&r2=1137402&view=diff
==============================================================================
---
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/XmlRpcBatchMgr.java
(original)
+++
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/XmlRpcBatchMgr.java
Sun Jun 19 18:25:41 2011
@@ -149,8 +149,29 @@ public class XmlRpcBatchMgr implements B
}
}
- protected void jobComplete(JobSpec spec) {
- spec.getJob().setStatus(JobStatus.COMPLETE);
+ protected void jobSuccess(JobSpec spec) {
+ spec.getJob().setStatus(JobStatus.SUCCESS);
+ synchronized (this.nodeToJobMap) {
+ this.nodeToJobMap.remove(spec.getJob().getId());
+ }
+ synchronized (this.specToProxyMap) {
+ XmlRpcBatchMgrProxy proxy = (XmlRpcBatchMgrProxy)
this.specToProxyMap
+ .remove(spec.getJob().getId());
+ if (proxy != null) {
+ proxy = null;
+ }
+ }
+
+ try {
+ repo.updateJob(spec);
+ } catch (JobRepositoryException e) {
+ LOG.log(Level.WARNING, "Error set job completion status for job: ["
+ + spec.getJob().getId() + "]: Message: " + e.getMessage());
+ }
+ }
+
+ protected void jobFailure(JobSpec spec) {
+ spec.getJob().setStatus(JobStatus.FAILURE);
synchronized (this.nodeToJobMap) {
this.nodeToJobMap.remove(spec.getJob().getId());
}
Modified:
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/XmlRpcBatchMgrProxy.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/XmlRpcBatchMgrProxy.java?rev=1137402&r1=1137401&r2=1137402&view=diff
==============================================================================
---
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/XmlRpcBatchMgrProxy.java
(original)
+++
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/XmlRpcBatchMgrProxy.java
Sun Jun 19 18:25:41 2011
@@ -21,6 +21,8 @@ package org.apache.oodt.cas.resource.bat
//JDK imports
import java.io.IOException;
import java.util.Vector;
+import java.util.logging.Level;
+import java.util.logging.Logger;
//OODT imports
import org.apache.oodt.cas.resource.structs.Job;
@@ -46,6 +48,8 @@ import org.apache.xmlrpc.XmlRpcException
*/
public class XmlRpcBatchMgrProxy extends Thread implements Runnable {
+ private static final Logger LOG =
Logger.getLogger(XmlRpcBatchMgrProxy.class.getName());
+
private JobSpec jobSpec;
private ResourceNode remoteHost;
@@ -115,19 +119,17 @@ public class XmlRpcBatchMgrProxy extends
parent.jobExecuting(jobSpec);
result = ((Boolean) client
.execute("batchstub.executeJob", argList)).booleanValue();
- } catch (XmlRpcException e) {
- e.printStackTrace();
- // throw new JobExecutionException(e);
- } catch (IOException e) {
- e.printStackTrace();
- // throw new JobExecutionException(e);
+ if (result)
+ parent.jobSuccess(jobSpec);
+ else
+ throw new Exception("batchstub.executeJob returned false");
+ } catch (Exception e) {
+ LOG.log(Level.SEVERE, "Job execution failed for jobId '" +
jobSpec.getJob().getId() + "' : " + e.getMessage(), e);
+ parent.jobFailure(jobSpec);
+ }finally {
+ parent.notifyMonitor(remoteHost, jobSpec);
}
- // notify the monitor job has finished;
- parent.notifyMonitor(remoteHost, jobSpec);
-
- // notify the job repository that the job has finished
- parent.jobComplete(jobSpec);
- }
+ }
}
Modified:
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/jobrepo/MemoryJobRepository.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/jobrepo/MemoryJobRepository.java?rev=1137402&r1=1137401&r2=1137402&view=diff
==============================================================================
---
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/jobrepo/MemoryJobRepository.java
(original)
+++
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/jobrepo/MemoryJobRepository.java
Sun Jun 19 18:25:41 2011
@@ -80,7 +80,8 @@ public class MemoryJobRepository impleme
*/
public boolean jobFinished(JobSpec spec) throws JobRepositoryException {
JobSpec persistedSpec = (JobSpec) jobMap.get(spec.getJob().getId());
- return persistedSpec.getJob().getStatus().equals(JobStatus.COMPLETE);
+ return persistedSpec.getJob().getStatus().equals(JobStatus.SUCCESS)
+ || persistedSpec.getJob().getStatus().equals(JobStatus.FAILURE);
}
/*
Modified:
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/jobrepo/XStreamJobRepository.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/jobrepo/XStreamJobRepository.java?rev=1137402&r1=1137401&r2=1137402&view=diff
==============================================================================
---
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/jobrepo/XStreamJobRepository.java
(original)
+++
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/jobrepo/XStreamJobRepository.java
Sun Jun 19 18:25:41 2011
@@ -107,7 +107,7 @@ public class XStreamJobRepository implem
public boolean jobFinished(JobSpec spec) throws JobRepositoryException {
String status = this.getStatus(spec);
- return status.equals(JobStatus.COMPLETE);
+ return status.equals(JobStatus.SUCCESS);
}
public synchronized void removeJob(JobSpec spec) throws
JobRepositoryException {
Modified:
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/JobStatus.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/JobStatus.java?rev=1137402&r1=1137401&r2=1137402&view=diff
==============================================================================
---
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/JobStatus.java
(original)
+++
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/JobStatus.java
Sun Jun 19 18:25:41 2011
@@ -32,7 +32,9 @@ public interface JobStatus {
public static final String SCHEDULED = "__Scheduled__";
- public static final String COMPLETE = "__Complete__";
+ public static final String SUCCESS = "__Success__";
+
+ public static final String FAILURE = "__Failure__";
public static final String KILLED = "__Killed__";
Modified:
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java?rev=1137402&r1=1137401&r2=1137402&view=diff
==============================================================================
---
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java
(original)
+++
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java
Sun Jun 19 18:25:41 2011
@@ -483,9 +483,9 @@ public class XmlRpcResourceManagerClient
complete = ((Boolean) client.execute("resourcemgr.isJobComplete",
argList)).booleanValue();
} catch (XmlRpcException e) {
- throw new JobRepositoryException(e.getMessage());
+ throw new JobRepositoryException(e.getMessage(), e);
} catch (IOException e) {
- throw new JobRepositoryException(e.getMessage());
+ throw new JobRepositoryException(e.getMessage(), e);
}
return complete;
@@ -501,9 +501,9 @@ public class XmlRpcResourceManagerClient
jobHash = (Hashtable) client.execute("resourcemgr.getJobInfo",
argList);
} catch (XmlRpcException e) {
- throw new JobRepositoryException(e.getMessage());
+ throw new JobRepositoryException(e.getMessage(), e);
} catch (IOException e) {
- throw new JobRepositoryException(e.getMessage());
+ throw new JobRepositoryException(e.getMessage(), e);
}
return XmlRpcStructFactory.getJobFromXmlRpc(jobHash);
@@ -590,9 +590,9 @@ public class XmlRpcResourceManagerClient
try {
jobId = (String) client.execute("resourcemgr.handleJob", argList);
} catch (XmlRpcException e) {
- throw new JobExecutionException(e.getMessage());
+ throw new JobExecutionException(e.getMessage(), e);
} catch (IOException e) {
- throw new JobExecutionException(e.getMessage());
+ throw new JobExecutionException(e.getMessage(), e);
}
return jobId;
@@ -612,9 +612,9 @@ public class XmlRpcResourceManagerClient
success = ((Boolean) client.execute("resourcemgr.handleJob",
argList)).booleanValue();
} catch (XmlRpcException e) {
- throw new JobExecutionException(e.getMessage());
+ throw new JobExecutionException(e.getMessage(), e);
} catch (IOException e) {
- throw new JobExecutionException(e.getMessage());
+ throw new JobExecutionException(e.getMessage(), e);
}
return success;
@@ -630,9 +630,9 @@ public class XmlRpcResourceManagerClient
nodeVector = (Vector) client.execute("resourcemgr.getNodes",
argList);
} catch (XmlRpcException e) {
- throw new MonitorException(e.getMessage());
+ throw new MonitorException(e.getMessage(), e);
} catch (IOException e) {
- throw new MonitorException(e.getMessage());
+ throw new MonitorException(e.getMessage(), e);
}
return XmlRpcStructFactory.getResourceNodeListFromXmlRpc(nodeVector);
@@ -649,9 +649,9 @@ public class XmlRpcResourceManagerClient
resNodeHash = (Hashtable) client.execute("resourcemgr.getNodeById",
argList);
} catch (XmlRpcException e) {
- throw new MonitorException(e.getMessage());
+ throw new MonitorException(e.getMessage(), e);
} catch (IOException e) {
- throw new MonitorException(e.getMessage());
+ throw new MonitorException(e.getMessage(), e);
}
return XmlRpcStructFactory.getResourceNodeFromXmlRpc(resNodeHash);
@@ -839,19 +839,21 @@ public class XmlRpcResourceManagerClient
throw new MonitorException(e.getMessage(), e);
}
}
-
- private static String getReadableJobStatus(String status) {
- if (status.equals(JobStatus.COMPLETE)) {
- return "COMPLETE";
- } else if (status.equals(JobStatus.EXECUTED)) {
- return "EXECUTED";
- } else if (status.equals(JobStatus.QUEUED)) {
- return "QUEUED";
- } else if (status.equals(JobStatus.SCHEDULED)) {
- return "SCHEDULED";
- } else if (status.equals(JobStatus.KILLED)) {
- return "KILLED";
- } else
- return null;
- }
+
+ private static String getReadableJobStatus(String status) {
+ if (status.equals(JobStatus.SUCCESS)) {
+ return "SUCCESS";
+ } else if (status.equals(JobStatus.FAILURE)) {
+ return "FAILURE";
+ } else if (status.equals(JobStatus.EXECUTED)) {
+ return "EXECUTED";
+ } else if (status.equals(JobStatus.QUEUED)) {
+ return "QUEUED";
+ } else if (status.equals(JobStatus.SCHEDULED)) {
+ return "SCHEDULED";
+ } else if (status.equals(JobStatus.KILLED)) {
+ return "KILLED";
+ } else
+ return null;
+ }
}
Modified:
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/system/extern/XmlRpcBatchStub.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/system/extern/XmlRpcBatchStub.java?rev=1137402&r1=1137401&r2=1137402&view=diff
==============================================================================
---
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/system/extern/XmlRpcBatchStub.java
(original)
+++
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/system/extern/XmlRpcBatchStub.java
Sun Jun 19 18:25:41 2011
@@ -174,7 +174,7 @@ public class XmlRpcBatchStub {
endThread = null;
}
- return true;
+ return runner.wasSuccessful();
} catch (Exception e) {
e.printStackTrace();
return false;
@@ -211,9 +211,12 @@ public class XmlRpcBatchStub {
private JobInstance job;
+ private boolean successful;
+
public RunnableJob(JobInstance job, JobInput in) {
this.job = job;
this.in = in;
+ this.successful = false;
}
/*
@@ -223,12 +226,16 @@ public class XmlRpcBatchStub {
*/
public void run() {
try {
- job.execute(in);
+ this.successful = job.execute(in);
} catch (JobInputException e) {
e.printStackTrace();
+ this.successful = false;
}
}
+ public boolean wasSuccessful() {
+ return this.successful;
+ }
}
}