Author: mona
Date: Tue Sep 17 23:05:14 2013
New Revision: 1524248
URL: http://svn.apache.org/r1524248
Log:
OOZIE-1539 Load more coordinator jobs eligible to be materialized in
MaterializeTriggerService (mona)
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCoordMaterializeTriggerService.java
oozie/trunk/release-log.txt
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java?rev=1524248&r1=1524247&r2=1524248&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
Tue Sep 17 23:05:14 2013
@@ -109,48 +109,61 @@ public class CoordMaterializeTriggerServ
private void runCoordJobMatLookup() {
XLog.Info.get().clear();
XLog LOG = XLog.getLog(getClass());
- JPAService jpaService = Services.get().get(JPAService.class);
try {
-
// get current date
Date currDate = new Date(new Date().getTime() + lookupInterval
* 1000);
// get list of all jobs that have actions that should be
materialized.
int materializationLimit = Services.get().getConf()
.getInt(CONF_MATERIALIZATION_SYSTEM_LIMIT,
CONF_MATERIALIZATION_SYSTEM_LIMIT_DEFAULT);
- CoordJobsToBeMaterializedJPAExecutor cmatcmd = new
CoordJobsToBeMaterializedJPAExecutor(currDate,
- materializationLimit);
+ // account for under-utilization of limit due to jobs maxed out
+ // against mat_throttle. hence repeat
+ if (materializeCoordJobs(currDate, materializationLimit, LOG))
{
+ materializeCoordJobs(currDate, materializationLimit, LOG);
+ }
+ }
+
+ catch (Exception ex) {
+ LOG.error("Exception while attempting to materialize
coordinator jobs, {0}", ex.getMessage(), ex);
+ }
+ }
+
+ private boolean materializeCoordJobs(Date currDate, int limit, XLog
LOG) {
+ try {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ CoordJobsToBeMaterializedJPAExecutor cmatcmd = new
CoordJobsToBeMaterializedJPAExecutor(currDate, limit);
List<CoordinatorJobBean> materializeJobs =
jpaService.execute(cmatcmd);
+ int rejected = 0;
LOG.info("CoordMaterializeTriggerService - Curr Date= " +
currDate + ", Num jobs to materialize = "
+ materializeJobs.size());
for (CoordinatorJobBean coordJob : materializeJobs) {
if
(Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(coordJob.getId()))
{
Services.get().get(InstrumentationService.class).get()
.incr(INSTRUMENTATION_GROUP,
INSTR_MAT_JOBS_COUNTER, 1);
- int numWaitingActions = jpaService
- .execute(new
CoordActionsActiveCountJPAExecutor(coordJob.getId()));
+ int numWaitingActions = jpaService.execute(new
CoordActionsActiveCountJPAExecutor(coordJob
+ .getId()));
LOG.info("Job :" + coordJob.getId() + "
numWaitingActions : " + numWaitingActions
+ " MatThrottle : " +
coordJob.getMatThrottling());
- // update lastModifiedTime so next time others might
have higher chance to get pick up
+ // update lastModifiedTime so next time others get
picked up in LRU fashion
coordJob.setLastModifiedTime(new Date());
CoordJobQueryExecutor.getInstance().executeUpdate(
-
CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME,
- coordJob);
+
CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME,
coordJob);
if (numWaitingActions >= coordJob.getMatThrottling()) {
- LOG.info("info for JobID [" + coordJob.getId() + "
already waiting "
- + numWaitingActions + " actions.
MatThrottle is : " + coordJob.getMatThrottling());
+ LOG.info("info for JobID [" + coordJob.getId() +
"] " + numWaitingActions
+ + " actions already waiting. MatThrottle
is : " + coordJob.getMatThrottling());
+ rejected++;
continue;
}
queueCallable(new
CoordMaterializeTransitionXCommand(coordJob.getId(), materializationWindow));
}
}
-
+ if (materializeJobs.size() == limit && rejected > 0) {
+ return true;
+ }
}
catch (JPAExecutorException jex) {
LOG.warn("JPAExecutorException while attempting to materialize
coordinator jobs", jex);
}
- catch (Exception ex) {
- LOG.error("Exception while attempting to materialize
coordinator jobs, {0}", ex.getMessage(), ex);
- }
+ return false;
}
/**
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java?rev=1524248&r1=1524247&r2=1524248&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
Tue Sep 17 23:05:14 2013
@@ -130,8 +130,9 @@ public class StatusTransitService implem
if (b1.getId().equals(b2.getId())) {
return 0;
}
- else
+ else {
return 1;
+ }
}
});
s.addAll(pendingJobList);
@@ -289,23 +290,15 @@ public class StatusTransitService implem
if ((isDoneMaterialization || coordStatus[0] ==
Job.Status.FAILED || coordStatus[0] == Job.Status.KILLED)
&& checkCoordTerminalStatus(coordActionStatus,
nonPendingCoordActionsCount,
coordStatus, isDoneMaterialization)) {
- LOG.info("Set coordinator job [" + jobId + "]
status to '" + coordStatus[0].toString()
- + "' from '" + coordJob.getStatus() + "'");
updateCoordJob(isPending, coordJob,
coordStatus[0]);
}
else if (checkCoordPausedStatus(coordActionStatus,
nonPendingCoordActionsCount, coordStatus)) {
- LOG.info("Set coordinator job [" + jobId + "]
status to " + coordStatus[0].toString()
- + "' from '" + coordJob.getStatus() + "'");
updateCoordJob(isPending, coordJob,
coordStatus[0]);
}
else if(checkCoordSuspendStatus( coordActionStatus,
nonPendingCoordActionsCount, coordStatus, coordJob.isDoneMaterialization(),
isPending)) {
- LOG.info("Set coordinator job [" + jobId + "]
status to " + coordStatus[0].toString()
- + "' from '" + coordJob.getStatus() + "'");
updateCoordJob(isPending, coordJob,
coordStatus[0]);
}
else if (checkCoordRunningStatus(coordActionStatus,
nonPendingCoordActionsCount, coordStatus)) {
- LOG.info("Set coordinator job [" + jobId + "]
status to " + coordStatus[0].toString()
- + "' from '" + coordJob.getStatus() + "'");
updateCoordJob(isPending, coordJob,
coordStatus[0]);
}
else {
@@ -662,7 +655,8 @@ public class StatusTransitService implem
// Backward support when coordinator namespace is 0.1
coordJob.setStatus(StatusUtils.getStatus(coordJob));
if (coordJob.getStatus() != prevStatus || isPendingStateChanged) {
- LOG.debug("Updating coord job " + coordJob.getId());
+ LOG.info("Set coordinator job [" + coordJob.getId() + "]
status to '" + coordJob.getStatus() + "' from '"
+ + prevStatus + "'");
coordJob.setLastModifiedTime(new Date());
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_MODTIME,
coordJob);
}
@@ -681,14 +675,14 @@ public class StatusTransitService implem
boolean prevPending = coordJob.isPending();
if (isPending) {
coordJob.setPending();
- LOG.info("Coord job [" + coordJob.getId() + "] Pending set to
TRUE");
}
else {
coordJob.resetPending();
- LOG.info("Coord job [" + coordJob.getId() + "] Pending set to
FALSE");
}
boolean hasChange = prevPending != coordJob.isPending();
if (saveToDB && hasChange) {
+ LOG.info("Change coordinator job [" + coordJob.getId() + "]
pending to '" + coordJob.isPending()
+ + "' from '" + prevPending + "'");
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_MODTIME,
coordJob);
}
return hasChange;
Modified:
oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCoordMaterializeTriggerService.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCoordMaterializeTriggerService.java?rev=1524248&r1=1524247&r2=1524248&view=diff
==============================================================================
---
oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCoordMaterializeTriggerService.java
(original)
+++
oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCoordMaterializeTriggerService.java
Tue Sep 17 23:05:14 2013
@@ -23,11 +23,14 @@ import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.CoordinatorJob.Execution;
import org.apache.oozie.client.CoordinatorJob.Timeunit;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetRunningActionsCountJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import
org.apache.oozie.service.CoordMaterializeTriggerService.CoordMaterializeTriggerRunnable;
import org.apache.oozie.service.UUIDService.ApplicationType;
import org.apache.oozie.test.XDataTestCase;
@@ -102,6 +105,36 @@ public class TestCoordMaterializeTrigger
assertEquals(CoordinatorJob.Status.RUNNING, coordJob.getStatus());
}
+ public void testCoordMaterializeTriggerService3() throws Exception {
+ Services.get().destroy();
+
setSystemProperty(CoordMaterializeTriggerService.CONF_MATERIALIZATION_SYSTEM_LIMIT,
"1");
+ services = new Services();
+ services.init();
+
+ Date start = new Date();
+ Date end = new Date(start.getTime() + 3600 * 5 * 1000);
+ CoordinatorJobBean job1 =
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false,
false, 1);
+ addRecordToCoordActionTable(job1.getId(), 2,
CoordinatorAction.Status.WAITING,
+ "coord-action-get.xml", 0);
+ job1.setMatThrottling(1);
+
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB,
job1);
+
+ CoordinatorJobBean job2 =
addRecordToCoordJobTable(CoordinatorJob.Status.PREP, start, end, false, false,
0);
+ CoordinatorJobBean job3 =
addRecordToCoordJobTable(CoordinatorJob.Status.PREP, start, end, false, false,
0);
+
+ Runnable runnable = new CoordMaterializeTriggerRunnable(3600, 300);
+ runnable.run();
+ sleep(1000);
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+ // second job is beyond limit but still should be picked up
+ job2 = jpaService.execute(new CoordJobGetJPAExecutor(job2.getId()));
+ assertEquals(CoordinatorJob.Status.RUNNING, job2.getStatus());
+ // third job not picked up because limit iteration only twice
+ job3 = jpaService.execute(new CoordJobGetJPAExecutor(job3.getId()));
+ assertEquals(CoordinatorJob.Status.PREP, job3.getStatus());
+ }
+
@Override
protected CoordinatorJobBean createCoordJob(CoordinatorJob.Status status,
Date start, Date end, boolean pending, boolean doneMatd, int lastActionNum)
throws Exception {
Path appPath = new Path(getFsTestCaseDir(), "coord");
Modified: oozie/trunk/release-log.txt
URL:
http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1524248&r1=1524247&r2=1524248&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Tue Sep 17 23:05:14 2013
@@ -1,5 +1,6 @@
-- Oozie 4.1.0 release (trunk - unreleased)
+OOZIE-1539 Load more coordinator jobs eligible to be materialized in
MaterializeTriggerService (mona)
OOZIE-1528 CoordRerunX and ActionEndX not updating some of the modified beans.
(virag)
OOZIE-1540 When oozie.zookeeper.oozie.id is not specified, its using a space
instead of the hostname (rkanter)
OOZIE-1509 Do not preload all tabs in Oozie UI and make Active Jobs default
(mona)