Author: mona
Date: Tue Jun 11 04:41:37 2013
New Revision: 1491688

URL: http://svn.apache.org/r1491688
Log:
OOZIE-1394 Fix Bugs in Job and SLA Events (mona)

Modified:
    oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
    
oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java
    
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
    
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
    
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java
    
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java
    
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
    
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsRunningJPAExecutor.java
    
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSuspendedJPAExecutor.java
    oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
    oozie/trunk/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
    oozie/trunk/core/src/main/resources/oozie-default.xml
    
oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
    oozie/trunk/release-log.txt

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java?rev=1491688&r1=1491687&r2=1491688&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java 
(original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java 
Tue Jun 11 04:41:37 2013
@@ -110,10 +110,10 @@ import org.apache.openjpa.persistence.jd
         @NamedQuery(name = "GET_COORD_ACTIONS_NOT_COMPLETED", query = "select 
a.id, a.status, a.pending, a.externalId, a.pushMissingDependencies, 
a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where 
a.jobId = :jobId AND a.status <> 'FAILED' AND a.status <> 'TIMEDOUT' AND 
a.status <> 'SUCCEEDED' AND a.status <> 'KILLED'"),
 
         // Query to retrieve action id, action status, pending status and 
external Id of running Coordinator actions
-        @NamedQuery(name = "GET_COORD_ACTIONS_RUNNING", query = "select a.id, 
a.status, a.pending, a.externalId from CoordinatorActionBean a where a.jobId = 
:jobId and a.status = 'RUNNING'"),
+        @NamedQuery(name = "GET_COORD_ACTIONS_RUNNING", query = "select a.id, 
a.status, a.pending, a.externalId, a.nominalTimestamp, a.createdTimestamp from 
CoordinatorActionBean a where a.jobId = :jobId and a.status = 'RUNNING'"),
 
         // Query to retrieve action id, action status, pending status and 
external Id of suspended Coordinator actions
-        @NamedQuery(name = "GET_COORD_ACTIONS_SUSPENDED", query = "select 
a.id, a.status, a.pending, a.externalId from CoordinatorActionBean a where 
a.jobId = :jobId and a.status = 'SUSPENDED'"),
+        @NamedQuery(name = "GET_COORD_ACTIONS_SUSPENDED", query = "select 
a.id, a.status, a.pending, a.externalId, a.nominalTimestamp, a.createdTimestamp 
from CoordinatorActionBean a where a.jobId = :jobId and a.status = 
'SUSPENDED'"),
 
         // Query to retrieve count of Coordinator actions which are pending
         @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_COUNT", query = "select 
count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending > 
0"),

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java?rev=1491688&r1=1491687&r2=1491688&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java 
(original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java 
Tue Jun 11 04:41:37 2013
@@ -82,10 +82,9 @@ public abstract class TransitionXCommand
             if (actionBean instanceof CoordinatorActionBean) {
                 CoordinatorActionBean caBean = (CoordinatorActionBean) 
actionBean;
                 caBean.setJobId(coordJob.getId());
-                if (EventHandlerService.isEnabled()) {
-                    CoordinatorXCommand.generateEvent(caBean, 
coordJob.getUser(), coordJob.getAppName());
-                }
+                CoordinatorXCommand.generateEvent(caBean, coordJob.getUser(), 
coordJob.getAppName());
             }
+            // TODO generate Coord Job event
         }
     }
 

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java?rev=1491688&r1=1491687&r2=1491688&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
 (original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
 Tue Jun 11 04:41:37 2013
@@ -33,6 +33,7 @@ import org.apache.oozie.executor.jpa.Bul
 import org.apache.oozie.executor.jpa.CoordJobGetActionsNotCompletedJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.LogUtils;
@@ -168,7 +169,9 @@ public class CoordKillXCommand extends K
     public void performWrites() throws CommandException {
         try {
             jpaService.execute(new 
BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
-            generateEvents(coordJob);
+            if (EventHandlerService.isEnabled()) {
+                generateEvents(coordJob);
+            }
         }
         catch (JPAExecutorException e) {
             throw new CommandException(e);

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java?rev=1491688&r1=1491687&r2=1491688&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
 (original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
 Tue Jun 11 04:41:37 2013
@@ -35,7 +35,6 @@ import org.apache.oozie.client.Coordinat
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.Job;
 import org.apache.oozie.client.SLAEvent.SlaAppType;
-import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.client.rest.RestConstants;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
@@ -412,12 +411,7 @@ public class CoordRerunXCommand extends 
         try {
             jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, 
insertList));
             if (EventHandlerService.isEnabled()) {
-                for (JsonBean bean : updateList) {
-                    if (bean instanceof CoordinatorActionBean) {
-                        
CoordinatorXCommand.generateEvent((CoordinatorActionBean) bean, 
coordJob.getUser(),
-                                coordJob.getAppName());
-                    }
-                }
+                generateEvents(coordJob);
             }
         }
         catch (JPAExecutorException e) {

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java?rev=1491688&r1=1491687&r2=1491688&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java
 (original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java
 Tue Jun 11 04:41:37 2013
@@ -26,6 +26,7 @@ import org.apache.oozie.ErrorCode;
 import org.apache.oozie.XException;
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.Job;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.command.ResumeTransitionXCommand;
@@ -35,6 +36,7 @@ import org.apache.oozie.executor.jpa.Bul
 import org.apache.oozie.executor.jpa.CoordJobGetActionsSuspendedJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.InstrumentUtils;
@@ -179,6 +181,9 @@ public class CoordResumeXCommand extends
     public void performWrites() throws CommandException {
         try {
             jpaService.execute(new 
BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
+            if (EventHandlerService.isEnabled()) {
+                generateEvents(coordJob);
+            }
         }
         catch (JPAExecutorException e) {
             throw new CommandException(e);

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java?rev=1491688&r1=1491687&r2=1491688&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java
 (original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java
 Tue Jun 11 04:41:37 2013
@@ -35,6 +35,7 @@ import org.apache.oozie.executor.jpa.Bul
 import org.apache.oozie.executor.jpa.CoordJobGetActionsRunningJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.InstrumentUtils;
@@ -185,7 +186,9 @@ public class CoordSuspendXCommand extend
     public void performWrites() throws CommandException {
         try {
             jpaService.execute(new 
BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
-            generateEvents(coordJob);
+            if (EventHandlerService.isEnabled()) {
+                generateEvents(coordJob);
+            }
         }
         catch (JPAExecutorException jex) {
             throw new CommandException(jex);

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java?rev=1491688&r1=1491687&r2=1491688&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java 
(original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java 
Tue Jun 11 04:41:37 2013
@@ -23,6 +23,7 @@ import org.apache.oozie.client.WorkflowJ
 import org.apache.oozie.client.SLAEvent.SlaAppType;
 import org.apache.oozie.client.SLAEvent.Status;
 import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.SLAEventBean;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
@@ -33,6 +34,7 @@ import org.apache.oozie.command.Precondi
 import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
 import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionGetForExternalIdJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
@@ -324,6 +326,11 @@ public class SignalXCommand extends Work
             // call JPAExecutor to do the bulk writes
             jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, 
insertList));
             if (generateEvent && EventHandlerService.isEnabled()) {
+                CoordinatorActionBean coordAction = jpaService.execute(new 
CoordActionGetForExternalIdJPAExecutor(wfJob
+                        .getId()));
+                if (coordAction != null) {
+                    wfJob.setParentId(coordAction.getId());
+                }
                 generateEvent(wfJob, wfJobErrorCode, wfJobErrorMsg);
             }
         }

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsRunningJPAExecutor.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsRunningJPAExecutor.java?rev=1491688&r1=1491687&r2=1491688&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsRunningJPAExecutor.java
 (original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsRunningJPAExecutor.java
 Tue Jun 11 04:41:37 2013
@@ -17,6 +17,7 @@
  */
 package org.apache.oozie.executor.jpa;
 
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -28,6 +29,7 @@ import org.apache.oozie.CoordinatorJobBe
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.Job.Status;
+import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.ParamChecker;
 
 /**
@@ -88,6 +90,12 @@ public class CoordJobGetActionsRunningJP
         if (arr[3] != null) {
             bean.setExternalId((String) arr[3]);
         }
+        if (arr[4] != null){
+            bean.setNominalTime(DateUtils.toDate((Timestamp) arr[4]));
+        }
+        if (arr[5] != null){
+            bean.setCreatedTime(DateUtils.toDate((Timestamp) arr[5]));
+        }
         return bean;
     }
 

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSuspendedJPAExecutor.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSuspendedJPAExecutor.java?rev=1491688&r1=1491687&r2=1491688&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSuspendedJPAExecutor.java
 (original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSuspendedJPAExecutor.java
 Tue Jun 11 04:41:37 2013
@@ -17,6 +17,7 @@
  */
 package org.apache.oozie.executor.jpa;
 
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -24,10 +25,9 @@ import javax.persistence.EntityManager;
 import javax.persistence.Query;
 
 import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.Job.Status;
+import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.ParamChecker;
 
 /**
@@ -89,6 +89,12 @@ public class CoordJobGetActionsSuspended
         if (arr[3] != null) {
             bean.setExternalId((String) arr[3]);
         }
+        if (arr[4] != null){
+            bean.setNominalTime(DateUtils.toDate((Timestamp) arr[4]));
+        }
+        if (arr[5] != null){
+            bean.setCreatedTime(DateUtils.toDate((Timestamp) arr[5]));
+        }
         return bean;
     }
 

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java?rev=1491688&r1=1491687&r2=1491688&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java 
(original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java 
Tue Jun 11 04:41:37 2013
@@ -60,10 +60,12 @@ public class SLACalculatorMemory impleme
     private static JPAService jpaService;
     private EventHandlerService eventHandler;
     private static int modifiedAfter;
+    private static long jobEventLatency;
 
     @Override
     public void init(Configuration conf) throws ServiceException {
         capacity = conf.getInt(SLAService.CONF_CAPACITY, 5000);
+        jobEventLatency = conf.getInt(SLAService.CONF_JOB_EVENT_LATENCY, 90 * 
1000);
         slaMap = new ConcurrentHashMap<String, SLACalcStatus>();
         historySet = Collections.synchronizedSet(new HashSet<String>());
         jpaService = Services.get().get(JPAService.class);
@@ -144,7 +146,7 @@ public class SLACalculatorMemory impleme
             // calculation w.r.t current time and status
             if ((eventProc & 1) == 0) { // first bit (start-processed) unset
                 if (reg.getExpectedStart() != null) {
-                    if (reg.getExpectedStart().getTime() < 
Calendar.getInstance(TimeZone.getTimeZone("UTC"))
+                    if (reg.getExpectedStart().getTime() + jobEventLatency < 
Calendar.getInstance(TimeZone.getTimeZone("UTC"))
                             .getTimeInMillis()) {
                         slaCalc.setEventStatus(EventStatus.START_MISS);
                         change = true;
@@ -160,8 +162,8 @@ public class SLACalculatorMemory impleme
             }
             if (((eventProc >> 1) & 1) == 0) { // check if second bit 
(duration-processed) is unset
                 if (slaCalc.getActualStart() != null) {
-                    if (reg.getExpectedDuration() < 
Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis()
-                            - slaCalc.getActualStart().getTime()) {
+                    if (reg.getExpectedDuration() + jobEventLatency < 
Calendar.getInstance(TimeZone.getTimeZone("UTC"))
+                            .getTimeInMillis() - 
slaCalc.getActualStart().getTime()) {
                         slaCalc.setEventStatus(EventStatus.DURATION_MISS);
                         change = true;
                         eventHandler.queueEvent(new SLACalcStatus(slaCalc));
@@ -170,8 +172,8 @@ public class SLACalculatorMemory impleme
                 }
             }
             if (eventProc < 4) {
-                if (reg.getExpectedEnd().getTime() < 
Calendar.getInstance(TimeZone.getTimeZone("UTC"))
-                        .getTimeInMillis()) {
+                if (reg.getExpectedEnd().getTime() + jobEventLatency < Calendar
+                        
.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis()) {
                     slaCalc.setEventStatus(EventStatus.END_MISS);
                     slaCalc.setSLAStatus(SLAStatus.MISS);
                     change = true;
@@ -335,7 +337,7 @@ public class SLACalculatorMemory impleme
      * @param actualStart
      * @return SLASummaryBean
      */
-    public SLASummaryBean processJobStartSLA(SLACalcStatus slaCalc, Date 
actualStart) {
+    private SLASummaryBean processJobStartSLA(SLACalcStatus slaCalc, Date 
actualStart) {
         slaCalc.setActualStart(actualStart);
         if (slaCalc.getSLAStatus().equals(SLAStatus.NOT_STARTED)) {
             slaCalc.setSLAStatus(SLAStatus.IN_PROCESS);
@@ -368,8 +370,9 @@ public class SLACalculatorMemory impleme
      * @param actualStart
      * @param actualEnd
      * @return SLASummaryBean
+     * @throws JPAExecutorException
      */
-    public SLASummaryBean processJobEndSuccessSLA(SLACalcStatus slaCalc, Date 
actualStart, Date actualEnd) {
+    private SLASummaryBean processJobEndSuccessSLA(SLACalcStatus slaCalc, Date 
actualStart, Date actualEnd) throws JPAExecutorException {
         SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
         slaCalc.setActualStart(actualStart);
         slaCalc.setActualEnd(actualEnd);
@@ -415,8 +418,9 @@ public class SLACalculatorMemory impleme
      * @param actualStart
      * @param actualEnd
      * @return SLASummaryBean
+     * @throws JPAExecutorException
      */
-    public SLASummaryBean processJobEndFailureSLA(SLACalcStatus slaCalc, Date 
actualStart, Date actualEnd) {
+    private SLASummaryBean processJobEndFailureSLA(SLACalcStatus slaCalc, Date 
actualStart, Date actualEnd) throws JPAExecutorException {
         slaCalc.setActualStart(actualStart);
         slaCalc.setActualEnd(actualEnd);
         SLARegistrationBean reg = slaCalc.getSLARegistrationBean();

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/service/SLAService.java?rev=1491688&r1=1491687&r2=1491688&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/service/SLAService.java 
(original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/service/SLAService.java 
Tue Jun 11 04:41:37 2013
@@ -18,25 +18,18 @@
 package org.apache.oozie.sla.service;
 
 import java.util.Date;
-import java.util.List;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.event.JobEvent.EventStatus;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
-import 
org.apache.oozie.executor.jpa.sla.SLARegistrationGetOnRestartJPAExecutor;
-import 
org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor;
 import org.apache.oozie.service.EventHandlerService;
-import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.SchedulerService;
 import org.apache.oozie.service.Service;
 import org.apache.oozie.service.ServiceException;
 import org.apache.oozie.service.Services;
-import org.apache.oozie.sla.SLACalcStatus;
 import org.apache.oozie.sla.SLACalculator;
 import org.apache.oozie.sla.SLACalculatorMemory;
 import org.apache.oozie.sla.SLARegistrationBean;
-import org.apache.oozie.sla.SLASummaryBean;
 import org.apache.oozie.util.XLog;
 import com.google.common.annotations.VisibleForTesting;
 
@@ -47,13 +40,12 @@ public class SLAService implements Servi
     public static final String CONF_CAPACITY = CONF_PREFIX + "capacity";
     public static final String CONF_ALERT_EVENTS = CONF_PREFIX + 
"alert.events";
     public static final String CONF_EVENTS_MODIFIED_AFTER = CONF_PREFIX + 
"events.modified.after";
+    public static final String CONF_JOB_EVENT_LATENCY = CONF_PREFIX + 
"job.event.latency";
 
     private static SLACalculator calcImpl;
     private static boolean slaEnabled = false;
     private EventHandlerService eventHandler;
     public static XLog LOG;
-    private JPAService jpaService;
-
     @Override
     public void init(Services services) throws ServiceException {
         try {

Modified: oozie/trunk/core/src/main/resources/oozie-default.xml
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/resources/oozie-default.xml?rev=1491688&r1=1491687&r2=1491688&view=diff
==============================================================================
--- oozie/trunk/core/src/main/resources/oozie-default.xml (original)
+++ oozie/trunk/core/src/main/resources/oozie-default.xml Tue Jun 11 04:41:37 
2013
@@ -1870,7 +1870,7 @@
 
     <property>
         <name>oozie.service.EventHandlerService.worker.interval</name>
-        <value>60</value>
+        <value>30</value>
         <description>
             The default interval (seconds) at which the worker threads will be 
scheduled to run
             and process events.
@@ -1918,11 +1918,12 @@
     </property>
 
     <property>
-        <name>oozie.sla.service.SLAService.events.modified.after</name>
-        <value>7</value>
-        <description> Retrive records which have been modified in last x 
amount of days (x is 7 by default)
+        <name>oozie.sla.service.SLAService.job.event.latency</name>
+        <value>90000</value>
+        <description>
+             Time in milliseconds to account of latency of getting the job 
status event
+             to compare against and decide sla miss/met
         </description>
     </property>
 
-
 </configuration>

Modified: 
oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java?rev=1491688&r1=1491687&r2=1491688&view=diff
==============================================================================
--- 
oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java 
(original)
+++ 
oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java 
Tue Jun 11 04:41:37 2013
@@ -42,10 +42,13 @@ import org.apache.oozie.client.WorkflowA
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.client.event.JobEvent;
 import org.apache.oozie.client.event.JobEvent.EventStatus;
+import org.apache.oozie.client.rest.RestConstants;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.coord.CoordActionCheckXCommand;
 import org.apache.oozie.command.coord.CoordActionInputCheckXCommand;
 import org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand;
+import org.apache.oozie.command.coord.CoordRerunXCommand;
+import org.apache.oozie.command.coord.CoordResumeXCommand;
 import org.apache.oozie.command.coord.CoordinatorXCommand;
 import org.apache.oozie.command.wf.ActionCheckXCommand;
 import org.apache.oozie.command.wf.ActionKillXCommand;
@@ -295,6 +298,40 @@ public class TestEventGeneration extends
         assertEquals(coord.getUser(), event.getUser());
         assertEquals(coord.getAppName(), event.getAppName());
 
+        // Action start on Coord Resume
+        coord.setStatus(CoordinatorJobBean.Status.SUSPENDED);
+        jpaService.execute(new CoordJobUpdateJPAExecutor(coord));
+        action.setStatus(CoordinatorAction.Status.SUSPENDED);
+        jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+        new CoordResumeXCommand(coord.getId()).call();
+        CoordinatorActionEvent cevent = (CoordinatorActionEvent) queue.poll();
+        assertEquals(EventStatus.STARTED, cevent.getEventStatus());
+        assertEquals(AppType.COORDINATOR_ACTION, cevent.getAppType());
+        assertEquals(action.getId(), cevent.getId());
+        assertEquals(action.getJobId(), cevent.getParentId());
+        assertEquals(action.getNominalTime(), cevent.getNominalTime());
+        assertEquals(action.getCreatedTime(), cevent.getStartTime());
+
+        // Action going to WAITING on Coord Rerun
+        action.setStatus(CoordinatorAction.Status.SUCCEEDED);
+        jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+        new CoordRerunXCommand(coord.getId(), 
RestConstants.JOB_COORD_RERUN_ACTION, "1", false, true)
+                .call();
+        waitFor(3 * 100, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                return jpaService.execute(coordGetCmd).getStatus() == 
CoordinatorAction.Status.WAITING;
+            }
+        });
+        cevent = (CoordinatorActionEvent) queue.poll();
+        assertEquals(EventStatus.WAITING, cevent.getEventStatus());
+        assertEquals(AppType.COORDINATOR_ACTION, cevent.getAppType());
+        assertEquals(action.getId(), cevent.getId());
+        assertEquals(action.getJobId(), cevent.getParentId());
+        assertEquals(action.getNominalTime(), cevent.getNominalTime());
+        assertEquals(action.getCreatedTime(), event.getStartTime());
+        assertNotNull(cevent.getMissingDeps());
+
     }
 
     @Test
@@ -458,7 +495,7 @@ public class TestEventGeneration extends
         _modifyCoordForFailureAction(coord);
         new CoordMaterializeTransitionXCommand(coord.getId(), 3600).call();
         final CoordJobGetJPAExecutor readCmd1 = new 
CoordJobGetJPAExecutor(coord.getId());
-        waitFor(1 * 300, new Predicate() {
+        waitFor(1 * 100, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
                 CoordinatorJobBean bean = jpaService.execute(readCmd1);

Modified: oozie/trunk/release-log.txt
URL: 
http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1491688&r1=1491687&r2=1491688&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Tue Jun 11 04:41:37 2013
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1394 Fix Bugs in Job and SLA Events (mona)
 OOZIE-1315 Refactor classes from launcher jar into Oozie sharelib (rkanter)
 OOZIE-1377 OpenJPA runtime enhancement should be disabled and update OpenJPA 
to 2.2.2 (tucu)
 OOZIE-1339 Implement SLA Bootstrap Service and fix bugs in SLACalculator 
(virag)


Reply via email to