Author: rohini
Date: Fri Aug 2 00:46:06 2013
New Revision: 1509503
URL: http://svn.apache.org/r1509503
Log:
OOZIE-1479 Duplicate end_miss events introduced by OOZIE-1472 (rohini)
Modified:
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLAService.java
oozie/branches/branch-4.0/release-log.txt
Modified:
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
URL:
http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java?rev=1509503&r1=1509502&r2=1509503&view=diff
==============================================================================
---
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
(original)
+++
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
Fri Aug 2 00:46:06 2013
@@ -290,15 +290,13 @@ public class SLACalculatorMemory impleme
if (reg.getExpectedStart() != null) {
if (reg.getExpectedStart().getTime() + jobEventLatency <
System.currentTimeMillis()) {
confirmWithDB(slaCalc);
- if ((slaCalc.getEventProcessed() & 1 ) == 0) {
+ eventProc = slaCalc.getEventProcessed();
+ if (eventProc != 8 && (eventProc & 1 ) == 0) {
//Some DB exception
slaCalc.setEventStatus(EventStatus.START_MISS);
eventHandler.queueEvent(new
SLACalcStatus(slaCalc));
eventProc++;
}
- else {
- eventProc = slaCalc.getEventProcessed();
- }
change = true;
}
}
@@ -307,7 +305,7 @@ public class SLACalculatorMemory impleme
change = true;
}
}
- if (((eventProc >> 1) & 1) == 0) { // check if second bit
(duration-processed) is unset
+ if (((eventProc >> 1) & 1) == 0 && eventProc != 8) { // check if
second bit (duration-processed) is unset
if (reg.getExpectedDuration() == -1) {
eventProc += 2;
change = true;
@@ -317,15 +315,13 @@ public class SLACalculatorMemory impleme
.getActualStart().getTime())) {
slaCalc.setEventProcessed(eventProc);
confirmWithDB(slaCalc);
- if (((slaCalc.getEventProcessed() >> 1) & 1 ) == 0) {
+ eventProc = slaCalc.getEventProcessed();
+ if (eventProc != 8 && ((eventProc >> 1) & 1 ) == 0) {
//Some DB exception
slaCalc.setEventStatus(EventStatus.DURATION_MISS);
eventHandler.queueEvent(new
SLACalcStatus(slaCalc));
eventProc += 2;
}
- else {
- eventProc = slaCalc.getEventProcessed();
- }
change = true;
}
}
@@ -339,8 +335,9 @@ public class SLACalculatorMemory impleme
}
}
if (change) {
- if (slaCalc.getEventProcessed() == 8) { //no more processing,
no transfer to history set
- eventProc = slaCalc.getEventProcessed();
+ if (slaCalc.getEventProcessed() >= 8) { //no more processing,
no transfer to history set
+ eventProc = 8;
+ slaCalc.setEventProcessed(8); // Should not be > 8. But to
handle any corner cases.
slaMap.remove(jobId);
}
else {
@@ -620,7 +617,7 @@ public class SLACalculatorMemory impleme
slaCalc.setActualStart(actualStart);
slaCalc.setActualEnd(actualEnd);
if (actualStart == null) { // job failed before starting
- if (slaCalc.getEventProcessed() != 5) { // 101 = end+start already
processed
+ if (slaCalc.getEventProcessed() < 4) {
slaCalc.setEventStatus(EventStatus.END_MISS);
slaCalc.setSLAStatus(SLAStatus.MISS);
eventHandler.queueEvent(new SLACalcStatus(slaCalc));
@@ -742,6 +739,15 @@ public class SLACalculatorMemory impleme
slaCalc.setSLAStatus(SLAStatus.MET);
}
if (slaCalc.getActualStart() != null) {
+ if ((eventProc & 1) == 0) {
+ if (slaCalc.getExpectedStart().getTime() <
slaCalc.getActualStart().getTime()) {
+ slaCalc.setEventStatus(EventStatus.START_MISS);
+ }
+ else {
+ slaCalc.setEventStatus(EventStatus.START_MET);
+ }
+ eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ }
slaCalc.setActualDuration(slaCalc.getActualEnd().getTime()
- slaCalc.getActualStart().getTime());
if (((eventProc >> 1) & 1) == 0) {
processDurationSLA(slaCalc.getExpectedDuration(),
slaCalc.getActualDuration(), slaCalc);
@@ -797,7 +803,8 @@ public class SLACalculatorMemory impleme
}
}
catch (Exception e) {
- LOG.warn("Error while confirming against DB: ", e);
+ LOG.warn("Error while confirming SLA against DB for jobid= " +
slaCalc.getId() + ". Exception is "
+ + e.getClass().getName() + ": " + e.getMessage());
if (slaCalc.getEventProcessed() < 4 &&
slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) {
slaCalc.setEventStatus(EventStatus.END_MISS);
slaCalc.setSLAStatus(SLAStatus.MISS);
Modified:
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
URL:
http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java?rev=1509503&r1=1509502&r2=1509503&view=diff
==============================================================================
---
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
(original)
+++
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
Fri Aug 2 00:46:06 2013
@@ -557,9 +557,8 @@ public class TestSLAEventGeneration exte
wf.setStatus(WorkflowJob.Status.FAILED);
jpaService.execute(new WorkflowJobUpdateJPAExecutor(wf));
new KillXCommand(jobId).call();
- waitForEventGeneration(1000);
- ehs.getEventQueue().poll(); //ignore the wf-action event generated
- waitForEventGeneration(1000); //wait for wf job kill event to generate
+ waitForEventGeneration(1000); //wait for wf-action kill event to
generate
+ Thread.sleep(200); //wait for wf job kill event to generate
ehs.new EventWorker().run();
waitForEventGeneration(2000); // time for listeners to run
ehs.getEventQueue().poll(); // ignore duration event
Modified:
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLAService.java
URL:
http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLAService.java?rev=1509503&r1=1509502&r2=1509503&view=diff
==============================================================================
---
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLAService.java
(original)
+++
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLAService.java
Fri Aug 2 00:46:06 2013
@@ -18,6 +18,7 @@
package org.apache.oozie.sla;
import java.util.Date;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.AppType;
import org.apache.oozie.CoordinatorActionBean;
@@ -97,12 +98,14 @@ public class TestSLAService extends XDat
// test start-miss
SLARegistrationBean sla1 = _createSLARegistration("job-1",
AppType.WORKFLOW_JOB);
sla1.setExpectedStart(new Date(System.currentTimeMillis() - 1 * 1 *
3600 * 1000)); //1 hour back
- sla1.setExpectedEnd(new Date(System.currentTimeMillis() + 1 * 1 * 3600
* 1000)); //1 hour ahead
+ sla1.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1 * 3600
* 1000)); //1 hour back
+ sla1.setExpectedDuration(10 * 60 * 1000); //10 mins
slas.addRegistrationEvent(sla1);
assertEquals(1, slas.getSLACalculator().size());
slas.runSLAWorker();
ehs.new EventWorker().run();
- assertTrue(output.toString().contains("Sla START - MISS!!!"));
+ assertEventNoDuplicates(output.toString(), "Sla START - MISS!!!");
+ assertEventNoDuplicates(output.toString(), "Sla END - MISS!!!");
output.setLength(0);
// test different jobs and events start-met and end-miss
@@ -213,11 +216,13 @@ public class TestSLAService extends XDat
CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0);
extWf = new WorkflowJobBean();
extWf.setId(action3.getExternalId());
- extWf.setEndTime(new Date(System.currentTimeMillis() - 1 * 1800 *
1000));
extWf.setStartTime(new Date(System.currentTimeMillis() - 1 * 2100 *
1000));
+ extWf.setEndTime(new Date(System.currentTimeMillis() - 1 * 1800 *
1000));
jpaService.execute(new WorkflowJobInsertJPAExecutor(extWf));
sla = _createSLARegistration(action3.getId(),
AppType.COORDINATOR_ACTION);
- sla.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1500 *
1000)); // in past but > actual end
+ sla.setExpectedStart(new Date(System.currentTimeMillis() - 1 * 3600 *
1000)); // cause start_miss
+ sla.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1500 *
1000)); // in past but > actual end, end_met
+ sla.setExpectedDuration(0); //cause duration miss
slas.addRegistrationEvent(sla);
slas.runSLAWorker();
@@ -228,12 +233,14 @@ public class TestSLAService extends XDat
count++;
}
assertEquals(3, count); // only 3 out of the 5 are correct end_misses
- assertTrue(output.toString().contains(job1.getId() + " Sla END -
MISS!!!"));
- assertTrue(output.toString().contains(action1.getId() + " Sla END -
MISS!!!"));
- assertTrue(output.toString().contains(action2.getId() + " Sla END -
MISS!!!"));
- assertTrue(output.toString().contains(job2.getId() + " Sla END -
MET!!!"));
- assertTrue(output.toString().contains(job2.getId() + " Sla DURATION -
MISS!!!"));
- assertTrue(output.toString().contains(action3.getId() + " Sla END -
MET!!!"));
+ assertEventNoDuplicates(output.toString(), job1.getId() + " Sla END -
MISS!!!");
+ assertEventNoDuplicates(output.toString(), action1.getId() + " Sla END
- MISS!!!");
+ assertEventNoDuplicates(output.toString(), action2.getId() + " Sla END
- MISS!!!");
+ assertEventNoDuplicates(output.toString(), job2.getId() + " Sla END -
MET!!!");
+ assertEventNoDuplicates(output.toString(), job2.getId() + " Sla
DURATION - MISS!!!");
+ assertEventNoDuplicates(output.toString(), action3.getId() + " Sla
START - MISS!!!");
+ assertEventNoDuplicates(output.toString(), action3.getId() + " Sla
DURATION - MISS!!!");
+ assertEventNoDuplicates(output.toString(), action3.getId() + " Sla END
- MET!!!");
// negative on MISS after DB check, updated with actual times
SLASummaryBean slaSummary = jpaService.execute(new
SLASummaryGetJPAExecutor(job2.getId()));
@@ -296,6 +303,13 @@ public class TestSLAService extends XDat
return bean;
}
+ private void assertEventNoDuplicates(String outputStr, String eventMsg) {
+ int index = outputStr.indexOf(eventMsg);
+ assertTrue(index != -1);
+ //No duplicates
+ assertTrue(outputStr.indexOf(eventMsg, index + 1) == -1);
+ }
+
public static class DummySLAEventListener extends SLAEventListener {
@Override
Modified: oozie/branches/branch-4.0/release-log.txt
URL:
http://svn.apache.org/viewvc/oozie/branches/branch-4.0/release-log.txt?rev=1509503&r1=1509502&r2=1509503&view=diff
==============================================================================
--- oozie/branches/branch-4.0/release-log.txt (original)
+++ oozie/branches/branch-4.0/release-log.txt Fri Aug 2 00:46:06 2013
@@ -1,5 +1,6 @@
-- Oozie 4.0.0 release
+OOZIE-1479 Duplicate end_miss events introduced by OOZIE-1472 (rohini)
OOZIE-1472 Confirm against database before generating start and duration miss
events (rohini)
OOZIE-1473 getKey() not overridden in some commands causing duplicates in
queue (virag)
OOZIE-1470 BundleStatusUpdateXCommand should get lock for bundle job (virag)