Author: mona
Date: Tue Jun 11 04:41:37 2013
New Revision: 1491688
URL: http://svn.apache.org/r1491688
Log:
OOZIE-1394 Fix Bugs in Job and SLA Events (mona)
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsRunningJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSuspendedJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
oozie/trunk/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
oozie/trunk/core/src/main/resources/oozie-default.xml
oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
oozie/trunk/release-log.txt
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java?rev=1491688&r1=1491687&r2=1491688&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
(original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
Tue Jun 11 04:41:37 2013
@@ -110,10 +110,10 @@ import org.apache.openjpa.persistence.jd
@NamedQuery(name = "GET_COORD_ACTIONS_NOT_COMPLETED", query = "select
a.id, a.status, a.pending, a.externalId, a.pushMissingDependencies,
a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where
a.jobId = :jobId AND a.status <> 'FAILED' AND a.status <> 'TIMEDOUT' AND
a.status <> 'SUCCEEDED' AND a.status <> 'KILLED'"),
// Query to retrieve action id, action status, pending status and
external Id of running Coordinator actions
- @NamedQuery(name = "GET_COORD_ACTIONS_RUNNING", query = "select a.id,
a.status, a.pending, a.externalId from CoordinatorActionBean a where a.jobId =
:jobId and a.status = 'RUNNING'"),
+ @NamedQuery(name = "GET_COORD_ACTIONS_RUNNING", query = "select a.id,
a.status, a.pending, a.externalId, a.nominalTimestamp, a.createdTimestamp from
CoordinatorActionBean a where a.jobId = :jobId and a.status = 'RUNNING'"),
// Query to retrieve action id, action status, pending status and
external Id of suspended Coordinator actions
- @NamedQuery(name = "GET_COORD_ACTIONS_SUSPENDED", query = "select
a.id, a.status, a.pending, a.externalId from CoordinatorActionBean a where
a.jobId = :jobId and a.status = 'SUSPENDED'"),
+ @NamedQuery(name = "GET_COORD_ACTIONS_SUSPENDED", query = "select
a.id, a.status, a.pending, a.externalId, a.nominalTimestamp, a.createdTimestamp
from CoordinatorActionBean a where a.jobId = :jobId and a.status =
'SUSPENDED'"),
// Query to retrieve count of Coordinator actions which are pending
@NamedQuery(name = "GET_COORD_ACTIONS_PENDING_COUNT", query = "select
count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending >
0"),
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java?rev=1491688&r1=1491687&r2=1491688&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java
Tue Jun 11 04:41:37 2013
@@ -82,10 +82,9 @@ public abstract class TransitionXCommand
if (actionBean instanceof CoordinatorActionBean) {
CoordinatorActionBean caBean = (CoordinatorActionBean)
actionBean;
caBean.setJobId(coordJob.getId());
- if (EventHandlerService.isEnabled()) {
- CoordinatorXCommand.generateEvent(caBean,
coordJob.getUser(), coordJob.getAppName());
- }
+ CoordinatorXCommand.generateEvent(caBean, coordJob.getUser(),
coordJob.getAppName());
}
+ // TODO generate Coord Job event
}
}
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java?rev=1491688&r1=1491687&r2=1491688&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
Tue Jun 11 04:41:37 2013
@@ -33,6 +33,7 @@ import org.apache.oozie.executor.jpa.Bul
import org.apache.oozie.executor.jpa.CoordJobGetActionsNotCompletedJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.LogUtils;
@@ -168,7 +169,9 @@ public class CoordKillXCommand extends K
public void performWrites() throws CommandException {
try {
jpaService.execute(new
BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
- generateEvents(coordJob);
+ if (EventHandlerService.isEnabled()) {
+ generateEvents(coordJob);
+ }
}
catch (JPAExecutorException e) {
throw new CommandException(e);
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java?rev=1491688&r1=1491687&r2=1491688&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
Tue Jun 11 04:41:37 2013
@@ -35,7 +35,6 @@ import org.apache.oozie.client.Coordinat
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.SLAEvent.SlaAppType;
-import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.client.rest.RestConstants;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
@@ -412,12 +411,7 @@ public class CoordRerunXCommand extends
try {
jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList,
insertList));
if (EventHandlerService.isEnabled()) {
- for (JsonBean bean : updateList) {
- if (bean instanceof CoordinatorActionBean) {
-
CoordinatorXCommand.generateEvent((CoordinatorActionBean) bean,
coordJob.getUser(),
- coordJob.getAppName());
- }
- }
+ generateEvents(coordJob);
}
}
catch (JPAExecutorException e) {
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java?rev=1491688&r1=1491687&r2=1491688&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java
Tue Jun 11 04:41:37 2013
@@ -26,6 +26,7 @@ import org.apache.oozie.ErrorCode;
import org.apache.oozie.XException;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.Job;
+import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.ResumeTransitionXCommand;
@@ -35,6 +36,7 @@ import org.apache.oozie.executor.jpa.Bul
import org.apache.oozie.executor.jpa.CoordJobGetActionsSuspendedJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.InstrumentUtils;
@@ -179,6 +181,9 @@ public class CoordResumeXCommand extends
public void performWrites() throws CommandException {
try {
jpaService.execute(new
BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
+ if (EventHandlerService.isEnabled()) {
+ generateEvents(coordJob);
+ }
}
catch (JPAExecutorException e) {
throw new CommandException(e);
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java?rev=1491688&r1=1491687&r2=1491688&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java
Tue Jun 11 04:41:37 2013
@@ -35,6 +35,7 @@ import org.apache.oozie.executor.jpa.Bul
import org.apache.oozie.executor.jpa.CoordJobGetActionsRunningJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.InstrumentUtils;
@@ -185,7 +186,9 @@ public class CoordSuspendXCommand extend
public void performWrites() throws CommandException {
try {
jpaService.execute(new
BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
- generateEvents(coordJob);
+ if (EventHandlerService.isEnabled()) {
+ generateEvents(coordJob);
+ }
}
catch (JPAExecutorException jex) {
throw new CommandException(jex);
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java?rev=1491688&r1=1491687&r2=1491688&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
Tue Jun 11 04:41:37 2013
@@ -23,6 +23,7 @@ import org.apache.oozie.client.WorkflowJ
import org.apache.oozie.client.SLAEvent.SlaAppType;
import org.apache.oozie.client.SLAEvent.Status;
import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.SLAEventBean;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
@@ -33,6 +34,7 @@ import org.apache.oozie.command.Precondi
import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionGetForExternalIdJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
@@ -324,6 +326,11 @@ public class SignalXCommand extends Work
// call JPAExecutor to do the bulk writes
jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList,
insertList));
if (generateEvent && EventHandlerService.isEnabled()) {
+ CoordinatorActionBean coordAction = jpaService.execute(new
CoordActionGetForExternalIdJPAExecutor(wfJob
+ .getId()));
+ if (coordAction != null) {
+ wfJob.setParentId(coordAction.getId());
+ }
generateEvent(wfJob, wfJobErrorCode, wfJobErrorMsg);
}
}
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsRunningJPAExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsRunningJPAExecutor.java?rev=1491688&r1=1491687&r2=1491688&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsRunningJPAExecutor.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsRunningJPAExecutor.java
Tue Jun 11 04:41:37 2013
@@ -17,6 +17,7 @@
*/
package org.apache.oozie.executor.jpa;
+import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
@@ -28,6 +29,7 @@ import org.apache.oozie.CoordinatorJobBe
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.Job.Status;
+import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.ParamChecker;
/**
@@ -88,6 +90,12 @@ public class CoordJobGetActionsRunningJP
if (arr[3] != null) {
bean.setExternalId((String) arr[3]);
}
+ if (arr[4] != null){
+ bean.setNominalTime(DateUtils.toDate((Timestamp) arr[4]));
+ }
+ if (arr[5] != null){
+ bean.setCreatedTime(DateUtils.toDate((Timestamp) arr[5]));
+ }
return bean;
}
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSuspendedJPAExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSuspendedJPAExecutor.java?rev=1491688&r1=1491687&r2=1491688&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSuspendedJPAExecutor.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSuspendedJPAExecutor.java
Tue Jun 11 04:41:37 2013
@@ -17,6 +17,7 @@
*/
package org.apache.oozie.executor.jpa;
+import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
@@ -24,10 +25,9 @@ import javax.persistence.EntityManager;
import javax.persistence.Query;
import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.Job.Status;
+import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.ParamChecker;
/**
@@ -89,6 +89,12 @@ public class CoordJobGetActionsSuspended
if (arr[3] != null) {
bean.setExternalId((String) arr[3]);
}
+ if (arr[4] != null){
+ bean.setNominalTime(DateUtils.toDate((Timestamp) arr[4]));
+ }
+ if (arr[5] != null){
+ bean.setCreatedTime(DateUtils.toDate((Timestamp) arr[5]));
+ }
return bean;
}
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java?rev=1491688&r1=1491687&r2=1491688&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
Tue Jun 11 04:41:37 2013
@@ -60,10 +60,12 @@ public class SLACalculatorMemory impleme
private static JPAService jpaService;
private EventHandlerService eventHandler;
private static int modifiedAfter;
+ private static long jobEventLatency;
@Override
public void init(Configuration conf) throws ServiceException {
capacity = conf.getInt(SLAService.CONF_CAPACITY, 5000);
+ jobEventLatency = conf.getInt(SLAService.CONF_JOB_EVENT_LATENCY, 90 *
1000);
slaMap = new ConcurrentHashMap<String, SLACalcStatus>();
historySet = Collections.synchronizedSet(new HashSet<String>());
jpaService = Services.get().get(JPAService.class);
@@ -144,7 +146,7 @@ public class SLACalculatorMemory impleme
// calculation w.r.t current time and status
if ((eventProc & 1) == 0) { // first bit (start-processed) unset
if (reg.getExpectedStart() != null) {
- if (reg.getExpectedStart().getTime() <
Calendar.getInstance(TimeZone.getTimeZone("UTC"))
+ if (reg.getExpectedStart().getTime() + jobEventLatency <
Calendar.getInstance(TimeZone.getTimeZone("UTC"))
.getTimeInMillis()) {
slaCalc.setEventStatus(EventStatus.START_MISS);
change = true;
@@ -160,8 +162,8 @@ public class SLACalculatorMemory impleme
}
if (((eventProc >> 1) & 1) == 0) { // check if second bit
(duration-processed) is unset
if (slaCalc.getActualStart() != null) {
- if (reg.getExpectedDuration() <
Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis()
- - slaCalc.getActualStart().getTime()) {
+ if (reg.getExpectedDuration() + jobEventLatency <
Calendar.getInstance(TimeZone.getTimeZone("UTC"))
+ .getTimeInMillis() -
slaCalc.getActualStart().getTime()) {
slaCalc.setEventStatus(EventStatus.DURATION_MISS);
change = true;
eventHandler.queueEvent(new SLACalcStatus(slaCalc));
@@ -170,8 +172,8 @@ public class SLACalculatorMemory impleme
}
}
if (eventProc < 4) {
- if (reg.getExpectedEnd().getTime() <
Calendar.getInstance(TimeZone.getTimeZone("UTC"))
- .getTimeInMillis()) {
+ if (reg.getExpectedEnd().getTime() + jobEventLatency < Calendar
+
.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis()) {
slaCalc.setEventStatus(EventStatus.END_MISS);
slaCalc.setSLAStatus(SLAStatus.MISS);
change = true;
@@ -335,7 +337,7 @@ public class SLACalculatorMemory impleme
* @param actualStart
* @return SLASummaryBean
*/
- public SLASummaryBean processJobStartSLA(SLACalcStatus slaCalc, Date
actualStart) {
+ private SLASummaryBean processJobStartSLA(SLACalcStatus slaCalc, Date
actualStart) {
slaCalc.setActualStart(actualStart);
if (slaCalc.getSLAStatus().equals(SLAStatus.NOT_STARTED)) {
slaCalc.setSLAStatus(SLAStatus.IN_PROCESS);
@@ -368,8 +370,9 @@ public class SLACalculatorMemory impleme
* @param actualStart
* @param actualEnd
* @return SLASummaryBean
+ * @throws JPAExecutorException
*/
- public SLASummaryBean processJobEndSuccessSLA(SLACalcStatus slaCalc, Date
actualStart, Date actualEnd) {
+ private SLASummaryBean processJobEndSuccessSLA(SLACalcStatus slaCalc, Date
actualStart, Date actualEnd) throws JPAExecutorException {
SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
slaCalc.setActualStart(actualStart);
slaCalc.setActualEnd(actualEnd);
@@ -415,8 +418,9 @@ public class SLACalculatorMemory impleme
* @param actualStart
* @param actualEnd
* @return SLASummaryBean
+ * @throws JPAExecutorException
*/
- public SLASummaryBean processJobEndFailureSLA(SLACalcStatus slaCalc, Date
actualStart, Date actualEnd) {
+ private SLASummaryBean processJobEndFailureSLA(SLACalcStatus slaCalc, Date
actualStart, Date actualEnd) throws JPAExecutorException {
slaCalc.setActualStart(actualStart);
slaCalc.setActualEnd(actualEnd);
SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/service/SLAService.java?rev=1491688&r1=1491687&r2=1491688&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
(original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
Tue Jun 11 04:41:37 2013
@@ -18,25 +18,18 @@
package org.apache.oozie.sla.service;
import java.util.Date;
-import java.util.List;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.event.JobEvent.EventStatus;
import org.apache.oozie.executor.jpa.JPAExecutorException;
-import
org.apache.oozie.executor.jpa.sla.SLARegistrationGetOnRestartJPAExecutor;
-import
org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor;
import org.apache.oozie.service.EventHandlerService;
-import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.SchedulerService;
import org.apache.oozie.service.Service;
import org.apache.oozie.service.ServiceException;
import org.apache.oozie.service.Services;
-import org.apache.oozie.sla.SLACalcStatus;
import org.apache.oozie.sla.SLACalculator;
import org.apache.oozie.sla.SLACalculatorMemory;
import org.apache.oozie.sla.SLARegistrationBean;
-import org.apache.oozie.sla.SLASummaryBean;
import org.apache.oozie.util.XLog;
import com.google.common.annotations.VisibleForTesting;
@@ -47,13 +40,12 @@ public class SLAService implements Servi
public static final String CONF_CAPACITY = CONF_PREFIX + "capacity";
public static final String CONF_ALERT_EVENTS = CONF_PREFIX +
"alert.events";
public static final String CONF_EVENTS_MODIFIED_AFTER = CONF_PREFIX +
"events.modified.after";
+ public static final String CONF_JOB_EVENT_LATENCY = CONF_PREFIX +
"job.event.latency";
private static SLACalculator calcImpl;
private static boolean slaEnabled = false;
private EventHandlerService eventHandler;
public static XLog LOG;
- private JPAService jpaService;
-
@Override
public void init(Services services) throws ServiceException {
try {
Modified: oozie/trunk/core/src/main/resources/oozie-default.xml
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/resources/oozie-default.xml?rev=1491688&r1=1491687&r2=1491688&view=diff
==============================================================================
--- oozie/trunk/core/src/main/resources/oozie-default.xml (original)
+++ oozie/trunk/core/src/main/resources/oozie-default.xml Tue Jun 11 04:41:37
2013
@@ -1870,7 +1870,7 @@
<property>
<name>oozie.service.EventHandlerService.worker.interval</name>
- <value>60</value>
+ <value>30</value>
<description>
The default interval (seconds) at which the worker threads will be
scheduled to run
and process events.
@@ -1918,11 +1918,12 @@
</property>
<property>
- <name>oozie.sla.service.SLAService.events.modified.after</name>
- <value>7</value>
- <description> Retrive records which have been modified in last x
amount of days (x is 7 by default)
+ <name>oozie.sla.service.SLAService.job.event.latency</name>
+ <value>90000</value>
+ <description>
+ Time in milliseconds to account of latency of getting the job
status event
+ to compare against and decide sla miss/met
</description>
</property>
-
</configuration>
Modified:
oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java?rev=1491688&r1=1491687&r2=1491688&view=diff
==============================================================================
---
oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
(original)
+++
oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
Tue Jun 11 04:41:37 2013
@@ -42,10 +42,13 @@ import org.apache.oozie.client.WorkflowA
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.event.JobEvent;
import org.apache.oozie.client.event.JobEvent.EventStatus;
+import org.apache.oozie.client.rest.RestConstants;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.coord.CoordActionCheckXCommand;
import org.apache.oozie.command.coord.CoordActionInputCheckXCommand;
import org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand;
+import org.apache.oozie.command.coord.CoordRerunXCommand;
+import org.apache.oozie.command.coord.CoordResumeXCommand;
import org.apache.oozie.command.coord.CoordinatorXCommand;
import org.apache.oozie.command.wf.ActionCheckXCommand;
import org.apache.oozie.command.wf.ActionKillXCommand;
@@ -295,6 +298,40 @@ public class TestEventGeneration extends
assertEquals(coord.getUser(), event.getUser());
assertEquals(coord.getAppName(), event.getAppName());
+ // Action start on Coord Resume
+ coord.setStatus(CoordinatorJobBean.Status.SUSPENDED);
+ jpaService.execute(new CoordJobUpdateJPAExecutor(coord));
+ action.setStatus(CoordinatorAction.Status.SUSPENDED);
+ jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+ new CoordResumeXCommand(coord.getId()).call();
+ CoordinatorActionEvent cevent = (CoordinatorActionEvent) queue.poll();
+ assertEquals(EventStatus.STARTED, cevent.getEventStatus());
+ assertEquals(AppType.COORDINATOR_ACTION, cevent.getAppType());
+ assertEquals(action.getId(), cevent.getId());
+ assertEquals(action.getJobId(), cevent.getParentId());
+ assertEquals(action.getNominalTime(), cevent.getNominalTime());
+ assertEquals(action.getCreatedTime(), cevent.getStartTime());
+
+ // Action going to WAITING on Coord Rerun
+ action.setStatus(CoordinatorAction.Status.SUCCEEDED);
+ jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+ new CoordRerunXCommand(coord.getId(),
RestConstants.JOB_COORD_RERUN_ACTION, "1", false, true)
+ .call();
+ waitFor(3 * 100, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return jpaService.execute(coordGetCmd).getStatus() ==
CoordinatorAction.Status.WAITING;
+ }
+ });
+ cevent = (CoordinatorActionEvent) queue.poll();
+ assertEquals(EventStatus.WAITING, cevent.getEventStatus());
+ assertEquals(AppType.COORDINATOR_ACTION, cevent.getAppType());
+ assertEquals(action.getId(), cevent.getId());
+ assertEquals(action.getJobId(), cevent.getParentId());
+ assertEquals(action.getNominalTime(), cevent.getNominalTime());
+ assertEquals(action.getCreatedTime(), event.getStartTime());
+ assertNotNull(cevent.getMissingDeps());
+
}
@Test
@@ -458,7 +495,7 @@ public class TestEventGeneration extends
_modifyCoordForFailureAction(coord);
new CoordMaterializeTransitionXCommand(coord.getId(), 3600).call();
final CoordJobGetJPAExecutor readCmd1 = new
CoordJobGetJPAExecutor(coord.getId());
- waitFor(1 * 300, new Predicate() {
+ waitFor(1 * 100, new Predicate() {
@Override
public boolean evaluate() throws Exception {
CoordinatorJobBean bean = jpaService.execute(readCmd1);
Modified: oozie/trunk/release-log.txt
URL:
http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1491688&r1=1491687&r2=1491688&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Tue Jun 11 04:41:37 2013
@@ -1,5 +1,6 @@
-- Oozie 4.1.0 release (trunk - unreleased)
+OOZIE-1394 Fix Bugs in Job and SLA Events (mona)
OOZIE-1315 Refactor classes from launcher jar into Oozie sharelib (rkanter)
OOZIE-1377 OpenJPA runtime enhancement should be disabled and update OpenJPA
to 2.2.2 (tucu)
OOZIE-1339 Implement SLA Bootstrap Service and fix bugs in SLACalculator
(virag)