Author: mona
Date: Tue Sep 17 23:05:14 2013
New Revision: 1524248

URL: http://svn.apache.org/r1524248
Log:
OOZIE-1539 Load more coordinator jobs eligible to be materialized in 
MaterializeTriggerService (mona)

Modified:
    
oozie/trunk/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
    
oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
    
oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCoordMaterializeTriggerService.java
    oozie/trunk/release-log.txt

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java?rev=1524248&r1=1524247&r2=1524248&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
 (original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
 Tue Sep 17 23:05:14 2013
@@ -109,48 +109,61 @@ public class CoordMaterializeTriggerServ
         private void runCoordJobMatLookup() {
             XLog.Info.get().clear();
             XLog LOG = XLog.getLog(getClass());
-            JPAService jpaService = Services.get().get(JPAService.class);
             try {
-
                 // get current date
                 Date currDate = new Date(new Date().getTime() + lookupInterval 
* 1000);
                 // get list of all jobs that have actions that should be 
materialized.
                 int materializationLimit = Services.get().getConf()
                         .getInt(CONF_MATERIALIZATION_SYSTEM_LIMIT, 
CONF_MATERIALIZATION_SYSTEM_LIMIT_DEFAULT);
-                CoordJobsToBeMaterializedJPAExecutor cmatcmd = new 
CoordJobsToBeMaterializedJPAExecutor(currDate,
-                        materializationLimit);
+                // account for under-utilization of limit due to jobs maxed out
+                // against mat_throttle. hence repeat
+                if (materializeCoordJobs(currDate, materializationLimit, LOG)) 
{
+                    materializeCoordJobs(currDate, materializationLimit, LOG);
+                }
+            }
+
+            catch (Exception ex) {
+                LOG.error("Exception while attempting to materialize 
coordinator jobs, {0}", ex.getMessage(), ex);
+            }
+        }
+
+        private boolean materializeCoordJobs(Date currDate, int limit, XLog 
LOG) {
+            try {
+                JPAService jpaService = Services.get().get(JPAService.class);
+                CoordJobsToBeMaterializedJPAExecutor cmatcmd = new 
CoordJobsToBeMaterializedJPAExecutor(currDate, limit);
                 List<CoordinatorJobBean> materializeJobs = 
jpaService.execute(cmatcmd);
+                int rejected = 0;
                 LOG.info("CoordMaterializeTriggerService - Curr Date= " + 
currDate + ", Num jobs to materialize = "
                         + materializeJobs.size());
                 for (CoordinatorJobBean coordJob : materializeJobs) {
                     if 
(Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(coordJob.getId()))
 {
                         Services.get().get(InstrumentationService.class).get()
                                 .incr(INSTRUMENTATION_GROUP, 
INSTR_MAT_JOBS_COUNTER, 1);
-                        int numWaitingActions = jpaService
-                                .execute(new 
CoordActionsActiveCountJPAExecutor(coordJob.getId()));
+                        int numWaitingActions = jpaService.execute(new 
CoordActionsActiveCountJPAExecutor(coordJob
+                                .getId()));
                         LOG.info("Job :" + coordJob.getId() + "  
numWaitingActions : " + numWaitingActions
                                 + " MatThrottle : " + 
coordJob.getMatThrottling());
-                        // update lastModifiedTime so next time others might 
have higher chance to get pick up
+                        // update lastModifiedTime so next time others get 
picked up in LRU fashion
                         coordJob.setLastModifiedTime(new Date());
                         CoordJobQueryExecutor.getInstance().executeUpdate(
-                                
CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME,
-                                coordJob);
+                                
CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME, 
coordJob);
                         if (numWaitingActions >= coordJob.getMatThrottling()) {
-                            LOG.info("info for JobID [" + coordJob.getId() + " 
already waiting "
-                                    + numWaitingActions + " actions. 
MatThrottle is : " + coordJob.getMatThrottling());
+                            LOG.info("info for JobID [" + coordJob.getId() + 
"] " + numWaitingActions
+                                    + " actions already waiting. MatThrottle 
is : " + coordJob.getMatThrottling());
+                            rejected++;
                             continue;
                         }
                         queueCallable(new 
CoordMaterializeTransitionXCommand(coordJob.getId(), materializationWindow));
                     }
                 }
-
+                if (materializeJobs.size() == limit && rejected > 0) {
+                    return true;
+                }
             }
             catch (JPAExecutorException jex) {
                 LOG.warn("JPAExecutorException while attempting to materialize 
coordinator jobs", jex);
             }
-            catch (Exception ex) {
-                LOG.error("Exception while attempting to materialize 
coordinator jobs, {0}", ex.getMessage(), ex);
-            }
+            return false;
         }
 
         /**

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java?rev=1524248&r1=1524247&r2=1524248&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
 (original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
 Tue Sep 17 23:05:14 2013
@@ -130,8 +130,9 @@ public class StatusTransitService implem
                     if (b1.getId().equals(b2.getId())) {
                         return 0;
                     }
-                    else
+                    else {
                         return 1;
+                    }
                 }
             });
             s.addAll(pendingJobList);
@@ -289,23 +290,15 @@ public class StatusTransitService implem
                         if ((isDoneMaterialization || coordStatus[0] == 
Job.Status.FAILED || coordStatus[0] == Job.Status.KILLED)
                                 && checkCoordTerminalStatus(coordActionStatus, 
nonPendingCoordActionsCount,
                                         coordStatus, isDoneMaterialization)) {
-                            LOG.info("Set coordinator job [" + jobId + "] 
status to '" + coordStatus[0].toString()
-                                    + "' from '" + coordJob.getStatus() + "'");
                             updateCoordJob(isPending, coordJob, 
coordStatus[0]);
                         }
                         else if (checkCoordPausedStatus(coordActionStatus, 
nonPendingCoordActionsCount, coordStatus)) {
-                            LOG.info("Set coordinator job [" + jobId + "] 
status to " + coordStatus[0].toString()
-                                    + "' from '" + coordJob.getStatus() + "'");
                             updateCoordJob(isPending, coordJob, 
coordStatus[0]);
                         }
                         else if(checkCoordSuspendStatus( coordActionStatus, 
nonPendingCoordActionsCount, coordStatus, coordJob.isDoneMaterialization(), 
isPending)) {
-                            LOG.info("Set coordinator job [" + jobId + "] 
status to " + coordStatus[0].toString()
-                                    + "' from '" + coordJob.getStatus() + "'");
                             updateCoordJob(isPending, coordJob, 
coordStatus[0]);
                         }
                         else if (checkCoordRunningStatus(coordActionStatus, 
nonPendingCoordActionsCount, coordStatus)) {
-                            LOG.info("Set coordinator job [" + jobId + "] 
status to " + coordStatus[0].toString()
-                                    + "' from '" + coordJob.getStatus() + "'");
                             updateCoordJob(isPending, coordJob, 
coordStatus[0]);
                         }
                         else {
@@ -662,7 +655,8 @@ public class StatusTransitService implem
             // Backward support when coordinator namespace is 0.1
             coordJob.setStatus(StatusUtils.getStatus(coordJob));
             if (coordJob.getStatus() != prevStatus || isPendingStateChanged) {
-                LOG.debug("Updating coord job " + coordJob.getId());
+                LOG.info("Set coordinator job [" + coordJob.getId() + "] 
status to '" + coordJob.getStatus() + "' from '"
+                        + prevStatus + "'");
                 coordJob.setLastModifiedTime(new Date());
                 
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_MODTIME,
 coordJob);
             }
@@ -681,14 +675,14 @@ public class StatusTransitService implem
             boolean prevPending = coordJob.isPending();
             if (isPending) {
                 coordJob.setPending();
-                LOG.info("Coord job [" + coordJob.getId() + "] Pending set to 
TRUE");
             }
             else {
                 coordJob.resetPending();
-                LOG.info("Coord job [" + coordJob.getId() + "] Pending set to 
FALSE");
             }
             boolean hasChange = prevPending != coordJob.isPending();
             if (saveToDB && hasChange) {
+                LOG.info("Change coordinator job [" + coordJob.getId() + "] 
pending to '" + coordJob.isPending()
+                        + "' from '" + prevPending + "'");
                 
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_MODTIME,
 coordJob);
             }
             return hasChange;

Modified: 
oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCoordMaterializeTriggerService.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCoordMaterializeTriggerService.java?rev=1524248&r1=1524247&r2=1524248&view=diff
==============================================================================
--- 
oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCoordMaterializeTriggerService.java
 (original)
+++ 
oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCoordMaterializeTriggerService.java
 Tue Sep 17 23:05:14 2013
@@ -23,11 +23,14 @@ import java.util.Date;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.CoordinatorJob.Execution;
 import org.apache.oozie.client.CoordinatorJob.Timeunit;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetRunningActionsCountJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
 import 
org.apache.oozie.service.CoordMaterializeTriggerService.CoordMaterializeTriggerRunnable;
 import org.apache.oozie.service.UUIDService.ApplicationType;
 import org.apache.oozie.test.XDataTestCase;
@@ -102,6 +105,36 @@ public class TestCoordMaterializeTrigger
         assertEquals(CoordinatorJob.Status.RUNNING, coordJob.getStatus());
     }
 
+    public void testCoordMaterializeTriggerService3() throws Exception {
+        Services.get().destroy();
+        
setSystemProperty(CoordMaterializeTriggerService.CONF_MATERIALIZATION_SYSTEM_LIMIT,
 "1");
+        services = new Services();
+        services.init();
+
+        Date start = new Date();
+        Date end = new Date(start.getTime() + 3600 * 5 * 1000);
+        CoordinatorJobBean job1 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false, 
false, 1);
+        addRecordToCoordActionTable(job1.getId(), 2, 
CoordinatorAction.Status.WAITING,
+                "coord-action-get.xml", 0);
+        job1.setMatThrottling(1);
+        
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB,
 job1);
+
+        CoordinatorJobBean job2 = 
addRecordToCoordJobTable(CoordinatorJob.Status.PREP, start, end, false, false, 
0);
+        CoordinatorJobBean job3 = 
addRecordToCoordJobTable(CoordinatorJob.Status.PREP, start, end, false, false, 
0);
+
+        Runnable runnable = new CoordMaterializeTriggerRunnable(3600, 300);
+        runnable.run();
+        sleep(1000);
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        // second job is beyond limit but still should be picked up
+        job2 = jpaService.execute(new CoordJobGetJPAExecutor(job2.getId()));
+        assertEquals(CoordinatorJob.Status.RUNNING, job2.getStatus());
+        // third job not picked up because limit iteration only twice
+        job3 = jpaService.execute(new CoordJobGetJPAExecutor(job3.getId()));
+        assertEquals(CoordinatorJob.Status.PREP, job3.getStatus());
+    }
+
     @Override
     protected CoordinatorJobBean createCoordJob(CoordinatorJob.Status status, 
Date start, Date end, boolean pending, boolean doneMatd, int lastActionNum) 
throws Exception {
         Path appPath = new Path(getFsTestCaseDir(), "coord");

Modified: oozie/trunk/release-log.txt
URL: 
http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1524248&r1=1524247&r2=1524248&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Tue Sep 17 23:05:14 2013
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1539 Load more coordinator jobs eligible to be materialized in 
MaterializeTriggerService (mona)
 OOZIE-1528 CoordRerunX and ActionEndX not updating some of the modified beans. 
(virag)
 OOZIE-1540 When oozie.zookeeper.oozie.id is not specified, its using a space 
instead of the hostname (rkanter)
 OOZIE-1509 Do not preload all tabs in Oozie UI and make Active Jobs default 
(mona)


Reply via email to