Author: mattmann
Date: Sun Jun 19 18:25:41 2011
New Revision: 1137402

URL: http://svn.apache.org/viewvc?rev=1137402&view=rev
Log:
- fix for OODT-279 Make Resource Manager FAILURE and SUCCESS aware instead of 
just COMPLETE aware

Modified:
    oodt/trunk/CHANGES.txt
    
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/XmlRpcBatchMgr.java
    
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/XmlRpcBatchMgrProxy.java
    
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/jobrepo/MemoryJobRepository.java
    
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/jobrepo/XStreamJobRepository.java
    
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/JobStatus.java
    
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java
    
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/system/extern/XmlRpcBatchStub.java

Modified: oodt/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/oodt/trunk/CHANGES.txt?rev=1137402&r1=1137401&r2=1137402&view=diff
==============================================================================
--- oodt/trunk/CHANGES.txt (original)
+++ oodt/trunk/CHANGES.txt Sun Jun 19 18:25:41 2011
@@ -4,7 +4,11 @@ Apache OODT Change Log
 Release 0.3-SNAPSHOT (in progress)
 --------------------------------------------
 
-* OODT-278 CAS-PGE returns success even if product file(s) fail to ingest 
(mattmann, bfoster)
+* OODT-279 Make Resource Manager FAILURE and SUCCESS aware instead of just 
+  COMPLETE aware (mattmann, bfoster)
+
+* OODT-278 CAS-PGE returns success even if product file(s) fail to 
+  ingest (mattmann, bfoster)
 
 * OODT-243 Add method is called on already existing jobspec (mattmann, bfoster)
 

Modified: 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/XmlRpcBatchMgr.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/XmlRpcBatchMgr.java?rev=1137402&r1=1137401&r2=1137402&view=diff
==============================================================================
--- 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/XmlRpcBatchMgr.java
 (original)
+++ 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/XmlRpcBatchMgr.java
 Sun Jun 19 18:25:41 2011
@@ -149,8 +149,29 @@ public class XmlRpcBatchMgr implements B
         }
     }
 
-    protected void jobComplete(JobSpec spec) {
-        spec.getJob().setStatus(JobStatus.COMPLETE);
+    protected void jobSuccess(JobSpec spec) {
+        spec.getJob().setStatus(JobStatus.SUCCESS);
+        synchronized (this.nodeToJobMap) {
+            this.nodeToJobMap.remove(spec.getJob().getId());
+        }
+        synchronized (this.specToProxyMap) {
+            XmlRpcBatchMgrProxy proxy = (XmlRpcBatchMgrProxy) 
this.specToProxyMap
+                    .remove(spec.getJob().getId());
+            if (proxy != null) {
+                proxy = null;
+            }
+        }
+
+        try {
+            repo.updateJob(spec);
+        } catch (JobRepositoryException e) {
+            LOG.log(Level.WARNING, "Error set job completion status for job: ["
+                    + spec.getJob().getId() + "]: Message: " + e.getMessage());
+        }
+    }
+    
+    protected void jobFailure(JobSpec spec) {
+        spec.getJob().setStatus(JobStatus.FAILURE);
         synchronized (this.nodeToJobMap) {
             this.nodeToJobMap.remove(spec.getJob().getId());
         }

Modified: 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/XmlRpcBatchMgrProxy.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/XmlRpcBatchMgrProxy.java?rev=1137402&r1=1137401&r2=1137402&view=diff
==============================================================================
--- 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/XmlRpcBatchMgrProxy.java
 (original)
+++ 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/XmlRpcBatchMgrProxy.java
 Sun Jun 19 18:25:41 2011
@@ -21,6 +21,8 @@ package org.apache.oodt.cas.resource.bat
 //JDK imports
 import java.io.IOException;
 import java.util.Vector;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 //OODT imports
 import org.apache.oodt.cas.resource.structs.Job;
@@ -46,6 +48,8 @@ import org.apache.xmlrpc.XmlRpcException
  */
 public class XmlRpcBatchMgrProxy extends Thread implements Runnable {
 
+       private static final Logger LOG = 
Logger.getLogger(XmlRpcBatchMgrProxy.class.getName());
+       
     private JobSpec jobSpec;
 
     private ResourceNode remoteHost;
@@ -115,19 +119,17 @@ public class XmlRpcBatchMgrProxy extends
             parent.jobExecuting(jobSpec);
             result = ((Boolean) client
                     .execute("batchstub.executeJob", argList)).booleanValue();
-        } catch (XmlRpcException e) {
-            e.printStackTrace();
-            // throw new JobExecutionException(e);
-        } catch (IOException e) {
-            e.printStackTrace();
-            // throw new JobExecutionException(e);
+            if (result)
+               parent.jobSuccess(jobSpec);
+            else
+               throw new Exception("batchstub.executeJob returned false");
+        } catch (Exception e) {
+               LOG.log(Level.SEVERE, "Job execution failed for jobId '" + 
jobSpec.getJob().getId() + "' : " + e.getMessage(), e);
+            parent.jobFailure(jobSpec);
+        }finally {
+            parent.notifyMonitor(remoteHost, jobSpec);
         }
 
-        // notify the monitor job has finished;
-        parent.notifyMonitor(remoteHost, jobSpec);
-
-        // notify the job repository that the job has finished
-        parent.jobComplete(jobSpec);
-    }
+   }
 
 }

Modified: 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/jobrepo/MemoryJobRepository.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/jobrepo/MemoryJobRepository.java?rev=1137402&r1=1137401&r2=1137402&view=diff
==============================================================================
--- 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/jobrepo/MemoryJobRepository.java
 (original)
+++ 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/jobrepo/MemoryJobRepository.java
 Sun Jun 19 18:25:41 2011
@@ -80,7 +80,8 @@ public class MemoryJobRepository impleme
    */
   public boolean jobFinished(JobSpec spec) throws JobRepositoryException {
     JobSpec persistedSpec = (JobSpec) jobMap.get(spec.getJob().getId());
-    return persistedSpec.getJob().getStatus().equals(JobStatus.COMPLETE);
+    return persistedSpec.getJob().getStatus().equals(JobStatus.SUCCESS) 
+       ||  persistedSpec.getJob().getStatus().equals(JobStatus.FAILURE);
   }
 
   /*

Modified: 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/jobrepo/XStreamJobRepository.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/jobrepo/XStreamJobRepository.java?rev=1137402&r1=1137401&r2=1137402&view=diff
==============================================================================
--- 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/jobrepo/XStreamJobRepository.java
 (original)
+++ 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/jobrepo/XStreamJobRepository.java
 Sun Jun 19 18:25:41 2011
@@ -107,7 +107,7 @@ public class XStreamJobRepository implem
 
        public boolean jobFinished(JobSpec spec) throws JobRepositoryException {
                String status = this.getStatus(spec);
-           return status.equals(JobStatus.COMPLETE);
+           return status.equals(JobStatus.SUCCESS);
        }
 
        public synchronized void removeJob(JobSpec spec) throws 
JobRepositoryException {

Modified: 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/JobStatus.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/JobStatus.java?rev=1137402&r1=1137401&r2=1137402&view=diff
==============================================================================
--- 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/JobStatus.java
 (original)
+++ 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/JobStatus.java
 Sun Jun 19 18:25:41 2011
@@ -32,7 +32,9 @@ public interface JobStatus {
   
   public static final String SCHEDULED = "__Scheduled__";
   
-  public static final String COMPLETE = "__Complete__";
+  public static final String SUCCESS = "__Success__";
+  
+  public static final String FAILURE = "__Failure__";
   
   public static final String KILLED = "__Killed__";
 

Modified: 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java?rev=1137402&r1=1137401&r2=1137402&view=diff
==============================================================================
--- 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java
 (original)
+++ 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java
 Sun Jun 19 18:25:41 2011
@@ -483,9 +483,9 @@ public class XmlRpcResourceManagerClient
             complete = ((Boolean) client.execute("resourcemgr.isJobComplete",
                     argList)).booleanValue();
         } catch (XmlRpcException e) {
-            throw new JobRepositoryException(e.getMessage());
+            throw new JobRepositoryException(e.getMessage(), e);
         } catch (IOException e) {
-            throw new JobRepositoryException(e.getMessage());
+            throw new JobRepositoryException(e.getMessage(), e);
         }
 
         return complete;
@@ -501,9 +501,9 @@ public class XmlRpcResourceManagerClient
             jobHash = (Hashtable) client.execute("resourcemgr.getJobInfo",
                     argList);
         } catch (XmlRpcException e) {
-            throw new JobRepositoryException(e.getMessage());
+            throw new JobRepositoryException(e.getMessage(), e);
         } catch (IOException e) {
-            throw new JobRepositoryException(e.getMessage());
+            throw new JobRepositoryException(e.getMessage(), e);
         }
 
         return XmlRpcStructFactory.getJobFromXmlRpc(jobHash);
@@ -590,9 +590,9 @@ public class XmlRpcResourceManagerClient
         try {
             jobId = (String) client.execute("resourcemgr.handleJob", argList);
         } catch (XmlRpcException e) {
-            throw new JobExecutionException(e.getMessage());
+            throw new JobExecutionException(e.getMessage(), e);
         } catch (IOException e) {
-            throw new JobExecutionException(e.getMessage());
+            throw new JobExecutionException(e.getMessage(), e);
         }
 
         return jobId;
@@ -612,9 +612,9 @@ public class XmlRpcResourceManagerClient
             success = ((Boolean) client.execute("resourcemgr.handleJob",
                     argList)).booleanValue();
         } catch (XmlRpcException e) {
-            throw new JobExecutionException(e.getMessage());
+            throw new JobExecutionException(e.getMessage(), e);
         } catch (IOException e) {
-            throw new JobExecutionException(e.getMessage());
+            throw new JobExecutionException(e.getMessage(), e);
         }
 
         return success;
@@ -630,9 +630,9 @@ public class XmlRpcResourceManagerClient
             nodeVector = (Vector) client.execute("resourcemgr.getNodes",
                     argList);
         } catch (XmlRpcException e) {
-            throw new MonitorException(e.getMessage());
+            throw new MonitorException(e.getMessage(), e);
         } catch (IOException e) {
-            throw new MonitorException(e.getMessage());
+            throw new MonitorException(e.getMessage(), e);
         }
 
         return XmlRpcStructFactory.getResourceNodeListFromXmlRpc(nodeVector);
@@ -649,9 +649,9 @@ public class XmlRpcResourceManagerClient
             resNodeHash = (Hashtable) client.execute("resourcemgr.getNodeById",
                     argList);
         } catch (XmlRpcException e) {
-            throw new MonitorException(e.getMessage());
+            throw new MonitorException(e.getMessage(), e);
         } catch (IOException e) {
-            throw new MonitorException(e.getMessage());
+            throw new MonitorException(e.getMessage(), e);
         }
 
         return XmlRpcStructFactory.getResourceNodeFromXmlRpc(resNodeHash);
@@ -839,19 +839,21 @@ public class XmlRpcResourceManagerClient
                throw new MonitorException(e.getMessage(), e);
        }
     }
-    
-    private static String getReadableJobStatus(String status) {
-        if (status.equals(JobStatus.COMPLETE)) {
-            return "COMPLETE";
-        } else if (status.equals(JobStatus.EXECUTED)) {
-            return "EXECUTED";
-        } else if (status.equals(JobStatus.QUEUED)) {
-            return "QUEUED";
-        } else if (status.equals(JobStatus.SCHEDULED)) {
-            return "SCHEDULED";
-        } else if (status.equals(JobStatus.KILLED)) {
-            return "KILLED";
-        } else
-            return null;
-    }
+
+  private static String getReadableJobStatus(String status) {
+    if (status.equals(JobStatus.SUCCESS)) {
+      return "SUCCESS";
+    } else if (status.equals(JobStatus.FAILURE)) {
+      return "FAILURE";
+    } else if (status.equals(JobStatus.EXECUTED)) {
+      return "EXECUTED";
+    } else if (status.equals(JobStatus.QUEUED)) {
+      return "QUEUED";
+    } else if (status.equals(JobStatus.SCHEDULED)) {
+      return "SCHEDULED";
+    } else if (status.equals(JobStatus.KILLED)) {
+      return "KILLED";
+    } else
+      return null;
+  }
 }

Modified: 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/system/extern/XmlRpcBatchStub.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/system/extern/XmlRpcBatchStub.java?rev=1137402&r1=1137401&r2=1137402&view=diff
==============================================================================
--- 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/system/extern/XmlRpcBatchStub.java
 (original)
+++ 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/system/extern/XmlRpcBatchStub.java
 Sun Jun 19 18:25:41 2011
@@ -174,7 +174,7 @@ public class XmlRpcBatchStub {
                     endThread = null;
             }
 
-            return true;
+            return runner.wasSuccessful();
         } catch (Exception e) {
             e.printStackTrace();
             return false;
@@ -211,9 +211,12 @@ public class XmlRpcBatchStub {
 
         private JobInstance job;
 
+        private boolean successful;
+        
         public RunnableJob(JobInstance job, JobInput in) {
             this.job = job;
             this.in = in;
+            this.successful = false;
         }
 
         /*
@@ -223,12 +226,16 @@ public class XmlRpcBatchStub {
          */
         public void run() {
             try {
-                job.execute(in);
+                this.successful = job.execute(in);
             } catch (JobInputException e) {
                 e.printStackTrace();
+                this.successful = false;
             }
 
         }
 
+        public boolean wasSuccessful() {
+               return this.successful;
+        }
     }
 }


Reply via email to