Repository: oozie Updated Branches: refs/heads/master c707867b7 -> 14599a0a5
OOZIE-1527 Fix scalability issues with coordinator materialization (puru via rohini) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/14599a0a Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/14599a0a Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/14599a0a Branch: refs/heads/master Commit: 14599a0a5c6cde5209b5f99574d65860dd9bffef Parents: c707867 Author: Rohini Palaniswamy <[email protected]> Authored: Wed Apr 23 09:45:09 2014 -0700 Committer: Rohini Palaniswamy <[email protected]> Committed: Wed Apr 23 09:45:09 2014 -0700 ---------------------------------------------------------------------- .../org/apache/oozie/CoordinatorJobBean.java | 5 +- .../CoordMaterializeTransitionXCommand.java | 65 +++++++++++++- .../executor/jpa/CoordJobQueryExecutor.java | 16 +++- .../CoordJobsToBeMaterializedJPAExecutor.java | 2 +- .../service/CoordMaterializeTriggerService.java | 48 ++++------ core/src/main/resources/oozie-default.xml | 36 +++++--- .../TestCoordMaterializeTransitionXCommand.java | 92 +++++++++++++++++++- ...estCoordJobsToBeMaterializedJPAExecutor.java | 2 + .../TestCoordMaterializeTriggerService.java | 83 ++++++++++++++++++ release-log.txt | 1 + 10 files changed, 297 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/14599a0a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java index 4a2ea39..7915698 100644 --- a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java +++ b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java @@ -91,7 +91,7 @@ import org.json.simple.JSONObject; @NamedQuery(name = "GET_COORD_JOB_ACTION_KILL", query = "select w.id, w.user, w.group, w.appName, w.statusStr from CoordinatorJobBean w where w.id = :id"), - @NamedQuery(name = "GET_COORD_JOB_MATERIALIZE", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.frequency, w.matThrottling, w.timeOut, w.timeZone, w.startTimestamp, w.endTimestamp, w.pauseTimestamp, w.nextMaterializedTimestamp, w.lastActionTimestamp, w.lastActionNumber, w.doneMaterialization, w.bundleId, w.conf, w.jobXml, w.appNamespace from CoordinatorJobBean w where w.id = :id"), + @NamedQuery(name = "GET_COORD_JOB_MATERIALIZE", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.frequency, w.matThrottling, w.timeOut, w.timeZone, w.startTimestamp, w.endTimestamp, w.pauseTimestamp, w.nextMaterializedTimestamp, w.lastActionTimestamp, w.lastActionNumber, w.doneMaterialization, w.bundleId, w.conf, w.jobXml, w.appNamespace, w.timeUnitStr from CoordinatorJobBean w where w.id = :id"), @NamedQuery(name = "GET_COORD_JOB_SUSPEND_KILL", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.bundleId, w.appNamespace, w.doneMaterialization from CoordinatorJobBean w where w.id = :id"), @@ -103,8 +103,11 @@ import org.json.simple.JSONObject; @NamedQuery(name = "GET_COORD_JOBS_COLUMNS", query = "select w.id, w.appName, w.statusStr, w.user, w.group, w.startTimestamp, w.endTimestamp, w.appPath, w.concurrency, w.frequency, w.lastActionTimestamp, w.nextMaterializedTimestamp, w.createdTimestamp, w.timeUnitStr, w.timeZone, w.timeOut from CoordinatorJobBean w order by w.createdTimestamp desc"), + //TODO need to remove. @NamedQuery(name = "GET_COORD_JOBS_OLDER_THAN", query = "select OBJECT(w) from CoordinatorJobBean w where w.startTimestamp <= :matTime AND (w.statusStr = 'PREP' OR w.statusStr = 'RUNNING' or w.statusStr = 'RUNNINGWITHERROR') AND (w.nextMaterializedTimestamp < :matTime OR w.nextMaterializedTimestamp IS NULL) AND (w.nextMaterializedTimestamp IS NULL OR (w.endTimestamp > w.nextMaterializedTimestamp AND (w.pauseTimestamp IS NULL OR w.pauseTimestamp > w.nextMaterializedTimestamp))) order by w.lastModifiedTimestamp"), + @NamedQuery(name = "GET_COORD_JOBS_OLDER_FOR_MATERILZATION", query = "select w.id from CoordinatorJobBean w where w.startTimestamp <= :matTime AND (w.statusStr = 'PREP' OR w.statusStr = 'RUNNING' or w.statusStr = 'RUNNINGWITHERROR') AND (w.nextMaterializedTimestamp < :matTime OR w.nextMaterializedTimestamp IS NULL) AND (w.nextMaterializedTimestamp IS NULL OR (w.endTimestamp > w.nextMaterializedTimestamp AND (w.pauseTimestamp IS NULL OR w.pauseTimestamp > w.nextMaterializedTimestamp))) and w.matThrottling > ( select count(w.id) from CoordinatorActionBean a where a.jobId = w.id and a.statusStr = 'WAITING') order by w.lastModifiedTimestamp"), + @NamedQuery(name = "GET_COORD_JOBS_OLDER_THAN_STATUS", query = "select OBJECT(w) from CoordinatorJobBean w where w.statusStr = :status AND w.lastModifiedTimestamp <= :lastModTime order by w.lastModifiedTimestamp"), @NamedQuery(name = "GET_COMPLETED_COORD_JOBS_OLDER_THAN_STATUS", query = "select OBJECT(w) from CoordinatorJobBean w where ( w.statusStr = 'SUCCEEDED' OR w.statusStr = 'FAILED' or w.statusStr = 'KILLED') AND w.lastModifiedTimestamp <= :lastModTime order by w.lastModifiedTimestamp"), http://git-wip-us.apache.org/repos/asf/oozie/blob/14599a0a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java index b483799..57cbb34 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java @@ -45,6 +45,7 @@ import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.service.CoordMaterializeTriggerService; import org.apache.oozie.service.EventHandlerService; import org.apache.oozie.service.JPAService; import org.apache.oozie.service.Service; @@ -59,13 +60,14 @@ import org.apache.oozie.util.XConfiguration; import org.apache.oozie.util.XmlUtils; import org.apache.oozie.util.db.SLADbOperations; import org.jdom.Element; +import org.jdom.JDOMException; /** * Materialize actions for specified start and end time for coordinator job. */ @SuppressWarnings("deprecation") public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCommand { - private static final int LOOKAHEAD_WINDOW = 300; // We look ahead 5 minutes for materialization; + private JPAService jpaService = null; private CoordinatorJobBean coordJob = null; private String jobId = null; @@ -74,6 +76,13 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo private final int materializationWindow; private int lastActionNumber = 1; // over-ride by DB value private CoordinatorJob.Status prevStatus = null; + + static final private int lookAheadWindow = Services + .get() + .getConf() + .getInt(CoordMaterializeTriggerService.CONF_LOOKUP_INTERVAL, + CoordMaterializeTriggerService.CONF_LOOKUP_INTERVAL_DEFAULT); + /** * Default MAX timeout in minutes, after which coordinator input check will timeout */ @@ -84,6 +93,7 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo * * @param jobId coordinator job id * @param materializationWindow materialization window to calculate end time + * @param lookahead window */ public CoordMaterializeTransitionXCommand(String jobId, int materializationWindow) { super("coord_mater", "coord_mater", 1); @@ -186,6 +196,7 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo startMatdTime = DateUtils.toDate(new Timestamp(startTimeMilli)); endMatdTime = DateUtils.toDate(new Timestamp(endTimeMilli)); + endMatdTime = getMaterializationTimeForCatchUp(endMatdTime); // if MaterializationWindow end time is greater than endTime // for job, then set it to endTime of job Date jobEndTime = coordJob.getEndTime(); @@ -197,6 +208,54 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo + ", window=" + materializationWindow); } + /** + * Get materialization for window for catch-up jobs. for current jobs,it reruns currentMatdate, For catch-up, end + * Mataterilized Time = startMatdTime + MatThrottling * frequency + * + * @param currentMatTime + * @return + * @throws CommandException + * @throws JDOMException + */ + private Date getMaterializationTimeForCatchUp(Date currentMatTime) throws CommandException { + if (currentMatTime.after(new Date())) { + return currentMatTime; + } + int frequency = 0; + try { + frequency = Integer.parseInt(coordJob.getFrequency()); + } + catch (NumberFormatException e) { + return currentMatTime; + } + + TimeZone appTz = DateUtils.getTimeZone(coordJob.getTimeZone()); + TimeUnit freqTU = TimeUnit.valueOf(coordJob.getTimeUnitStr()); + Calendar startInstance = Calendar.getInstance(appTz); + startInstance.setTime(startMatdTime); + Calendar endMatInstance = null; + Calendar previousInstance = startInstance; + for (int i = 1; i <= coordJob.getMatThrottling(); i++) { + endMatInstance = (Calendar) startInstance.clone(); + endMatInstance.add(freqTU.getCalendarUnit(), i * frequency); + if (endMatInstance.getTime().compareTo(new Date()) >= 0) { + if (previousInstance.after(currentMatTime)) { + return previousInstance.getTime(); + } + else { + return currentMatTime; + } + } + previousInstance = endMatInstance; + } + if (endMatInstance == null) { + return currentMatTime; + } + else { + return endMatInstance.getTime(); + } + } + /* (non-Javadoc) * @see org.apache.oozie.command.XCommand#verifyPrecondition() */ @@ -223,7 +282,7 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo if (startTime == null) { startTime = coordJob.getStartTimestamp(); - if (startTime.after(new Timestamp(System.currentTimeMillis() + LOOKAHEAD_WINDOW * 1000))) { + if (startTime.after(new Timestamp(System.currentTimeMillis() + lookAheadWindow * 1000))) { throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId + " job's start time is not reached yet - nothing to materialize"); } @@ -311,7 +370,7 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo TimeZone appTz = DateUtils.getTimeZone(coordJob.getTimeZone()); String frequency = coordJob.getFrequency(); - TimeUnit freqTU = TimeUnit.valueOf(eJob.getAttributeValue("freq_timeunit")); + TimeUnit freqTU = TimeUnit.valueOf(coordJob.getTimeUnitStr()); TimeUnit endOfFlag = TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration")); Calendar start = Calendar.getInstance(appTz); start.setTime(startMatdTime); http://git-wip-us.apache.org/repos/asf/oozie/blob/14599a0a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java index 67a919d..1a6ded7 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java @@ -61,7 +61,8 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo GET_COORD_JOB_MATERIALIZE, GET_COORD_JOB_SUSPEND_KILL, GET_COORD_JOB_STATUS_PARENTID, - GET_COORD_JOBS_CHANGED + GET_COORD_JOBS_CHANGED, + GET_COORD_JOBS_OLDER_FOR_MATERILZATION }; private static CoordJobQueryExecutor instance = new CoordJobQueryExecutor(); @@ -208,6 +209,14 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo case GET_COORD_JOBS_CHANGED: query.setParameter("lastModifiedTime", new Timestamp(((Date)parameters[0]).getTime())); break; + case GET_COORD_JOBS_OLDER_FOR_MATERILZATION: + query.setParameter("matTime", new Timestamp(((Date)parameters[0]).getTime())); + int limit = (Integer) parameters[1]; + if (limit > 0) { + query.setMaxResults(limit); + } + break; + default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " + namedQuery.name()); @@ -288,6 +297,7 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo bean.setConfBlob((StringBlob) arr[17]); bean.setJobXmlBlob((StringBlob) arr[18]); bean.setAppNamespace((String) arr[19]); + bean.setTimeUnitStr((String) arr[20]); break; case GET_COORD_JOB_SUSPEND_KILL: bean = new CoordinatorJobBean(); @@ -311,6 +321,10 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo case GET_COORD_JOBS_CHANGED: bean = (CoordinatorJobBean) ret; break; + case GET_COORD_JOBS_OLDER_FOR_MATERILZATION: + bean = new CoordinatorJobBean(); + bean.setId((String) ret); + break; default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for " + namedQuery.name()); http://git-wip-us.apache.org/repos/asf/oozie/blob/14599a0a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java index ca11a24..40decff 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java @@ -55,7 +55,7 @@ public class CoordJobsToBeMaterializedJPAExecutor implements JPAExecutor<List<Co public List<CoordinatorJobBean> execute(EntityManager em) throws JPAExecutorException { List<CoordinatorJobBean> cjBeans; try { - Query q = em.createNamedQuery("GET_COORD_JOBS_OLDER_THAN"); + Query q = em.createNamedQuery("GET_COORD_JOBS_OLDER_FOR_MATERILZATION"); q.setParameter("matTime", new Timestamp(this.dateInput.getTime())); if (limit > 0) { q.setMaxResults(limit); http://git-wip-us.apache.org/repos/asf/oozie/blob/14599a0a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java index 550eb80..ef3c3f4 100644 --- a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java +++ b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java @@ -24,10 +24,9 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.CoordinatorJobBean; import org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand; -import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; -import org.apache.oozie.executor.jpa.CoordJobsToBeMaterializedJPAExecutor; import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; import org.apache.oozie.util.XCallable; import org.apache.oozie.util.XLog; import org.apache.oozie.util.DateUtils; @@ -43,6 +42,8 @@ public class CoordMaterializeTriggerService implements Service { * Time interval, in seconds, at which the Job materialization service will be scheduled to run. */ public static final String CONF_LOOKUP_INTERVAL = CONF_PREFIX + "lookup.interval"; + + public static final String CONF_SCHEDULING_INTERVAL = CONF_PREFIX + "scheduling.interval"; /** * This configuration defined the duration for which job should be materialized in future */ @@ -58,7 +59,8 @@ public class CoordMaterializeTriggerService implements Service { private static final String INSTRUMENTATION_GROUP = "coord_job_mat"; private static final String INSTR_MAT_JOBS_COUNTER = "jobs"; - private static final int CONF_LOOKUP_INTERVAL_DEFAULT = 300; + public static final int CONF_LOOKUP_INTERVAL_DEFAULT = 300; + private static final int CONF_SCHEDULING_INTERVAL_DEFAULT = 300; private static final int CONF_MATERIALIZATION_WINDOW_DEFAULT = 3600; private static final int CONF_MATERIALIZATION_SYSTEM_LIMIT_DEFAULT = 50; @@ -116,11 +118,7 @@ public class CoordMaterializeTriggerService implements Service { // get list of all jobs that have actions that should be materialized. int materializationLimit = Services.get().getConf() .getInt(CONF_MATERIALIZATION_SYSTEM_LIMIT, CONF_MATERIALIZATION_SYSTEM_LIMIT_DEFAULT); - // account for under-utilization of limit due to jobs maxed out - // against mat_throttle. hence repeat - if (materializeCoordJobs(currDate, materializationLimit, LOG)) { - materializeCoordJobs(currDate, materializationLimit, LOG); - } + materializeCoordJobs(currDate, materializationLimit, LOG); } catch (Exception ex) { @@ -128,43 +126,27 @@ public class CoordMaterializeTriggerService implements Service { } } - private boolean materializeCoordJobs(Date currDate, int limit, XLog LOG) { + private void materializeCoordJobs(Date currDate, int limit, XLog LOG) throws JPAExecutorException { try { - JPAService jpaService = Services.get().get(JPAService.class); - CoordJobsToBeMaterializedJPAExecutor cmatcmd = new CoordJobsToBeMaterializedJPAExecutor(currDate, limit); - List<CoordinatorJobBean> materializeJobs = jpaService.execute(cmatcmd); - int rejected = 0; - LOG.info("CoordMaterializeTriggerService - Curr Date= " + DateUtils.formatDateOozieTZ(currDate) + ", Num jobs to materialize = " - + materializeJobs.size()); + List<CoordinatorJobBean> materializeJobs = CoordJobQueryExecutor.getInstance().getList( + CoordJobQuery.GET_COORD_JOBS_OLDER_FOR_MATERILZATION, currDate, limit); + LOG.info("CoordMaterializeTriggerService - Curr Date= " + DateUtils.formatDateOozieTZ(currDate) + + ", Num jobs to materialize = " + materializeJobs.size()); for (CoordinatorJobBean coordJob : materializeJobs) { if (Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(coordJob.getId())) { Services.get().get(InstrumentationService.class).get() .incr(INSTRUMENTATION_GROUP, INSTR_MAT_JOBS_COUNTER, 1); - int numWaitingActions = jpaService.execute(new CoordActionsActiveCountJPAExecutor(coordJob - .getId())); - LOG.info("Job :" + coordJob.getId() + " numWaitingActions : " + numWaitingActions - + " MatThrottle : " + coordJob.getMatThrottling()); - // update lastModifiedTime so next time others get picked up in LRU fashion + queueCallable(new CoordMaterializeTransitionXCommand(coordJob.getId(), materializationWindow)); coordJob.setLastModifiedTime(new Date()); + // TODO In place of calling single query, we should call bulk update. CoordJobQueryExecutor.getInstance().executeUpdate( CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME, coordJob); - if (numWaitingActions >= coordJob.getMatThrottling()) { - LOG.info("info for JobID [" + coordJob.getId() + "] " + numWaitingActions - + " actions already waiting. MatThrottle is : " + coordJob.getMatThrottling()); - rejected++; - continue; - } - queueCallable(new CoordMaterializeTransitionXCommand(coordJob.getId(), materializationWindow)); } } - if (materializeJobs.size() == limit && rejected > 0) { - return true; - } } catch (JPAExecutorException jex) { LOG.warn("JPAExecutorException while attempting to materialize coordinator jobs", jex); } - return false; } /** @@ -200,10 +182,12 @@ public class CoordMaterializeTriggerService implements Service { int materializationWindow = conf.getInt(CONF_MATERIALIZATION_WINDOW, CONF_MATERIALIZATION_WINDOW_DEFAULT); // default is 300sec (5min) int lookupInterval = Services.get().getConf().getInt(CONF_LOOKUP_INTERVAL, CONF_LOOKUP_INTERVAL_DEFAULT); + // default is 300sec (5min) + int schedulingInterval = Services.get().getConf().getInt(CONF_SCHEDULING_INTERVAL, CONF_SCHEDULING_INTERVAL_DEFAULT); Runnable lookupTriggerJobsRunnable = new CoordMaterializeTriggerRunnable(materializationWindow, lookupInterval); - services.get(SchedulerService.class).schedule(lookupTriggerJobsRunnable, 10, lookupInterval, + services.get(SchedulerService.class).schedule(lookupTriggerJobsRunnable, 10, schedulingInterval, SchedulerService.Unit.SEC); } http://git-wip-us.apache.org/repos/asf/oozie/blob/14599a0a/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index 0198230..47fa0e4 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -408,21 +408,29 @@ <!-- CoordMaterializeTriggerService --> - <property> - <name>oozie.service.CoordMaterializeTriggerService.lookup.interval - </name> - <value>300</value> - <description> Coordinator Job Lookup trigger command is scheduled at - this "interval" (in seconds).</description> - </property> + <property> + <name>oozie.service.CoordMaterializeTriggerService.lookup.interval + </name> + <value>300</value> + <description> Coordinator Job Lookup interval.(in seconds). + </description> + </property> - <property> - <name>oozie.service.CoordMaterializeTriggerService.materialization.window - </name> - <value>3600</value> - <description> Coordinator Job Lookup command materialized each job for - this next "window" duration</description> - </property> + <property> + <name>oozie.service.CoordMaterializeTriggerService.scheduling.interval + </name> + <value>300</value> + <description> The frequency at which the CoordMaterializeTriggerService will run.</description> + </property> + + <property> + <name>oozie.service.CoordMaterializeTriggerService.materialization.window + </name> + <value>3600</value> + <description> Coordinator Job Lookup command materialized each + job for this next "window" duration + </description> + </property> <property> <name>oozie.service.CoordMaterializeTriggerService.callable.batch.size</name> http://git-wip-us.apache.org/repos/asf/oozie/blob/14599a0a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java index 6c73585..9a8d65b 100644 --- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java @@ -18,7 +18,6 @@ package org.apache.oozie.command.coord; import java.sql.Timestamp; -import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -35,6 +34,8 @@ import org.apache.oozie.executor.jpa.CoordJobGetActionsJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobGetRunningActionsCountJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor; +import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; +import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.executor.jpa.CoordJobGetActionsSubsetJPAExecutor; import org.apache.oozie.executor.jpa.SLAEventsGetForSeqIdJPAExecutor; @@ -473,6 +474,95 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase { checkCoordJobs(job.getId(), CoordinatorJob.Status.PREP); } + /** + * Test lookup materialization for catchup jobs + * + * @throws Exception + */ + public void testMaterizationLookup() throws Exception { + long TIME_IN_MIN = 60 * 1000; + long TIME_IN_HOURS = TIME_IN_MIN * 60; + long TIME_IN_DAY = TIME_IN_HOURS * 24; + JPAService jpaService = Services.get().get(JPAService.class); + // test with days + Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T01:00Z"); + Date endTime = DateUtils.parseDateOozieTZ("2009-05-03T23:59Z"); + CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, + 0); + job.setNextMaterializedTime(startTime); + job.setMatThrottling(3); + job.setFrequency("1"); + job.setTimeUnitStr("DAY"); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job); + new CoordMaterializeTransitionXCommand(job.getId(), 3600).call(); + job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId())); + assertEquals(new Date(startTime.getTime() + TIME_IN_DAY * 3), job.getNextMaterializedTime()); + + // test with hours + startTime = DateUtils.parseDateOozieTZ("2009-02-01T01:00Z"); + endTime = DateUtils.parseDateOozieTZ("2009-05-03T23:59Z"); + job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0); + job.setNextMaterializedTime(startTime); + job.setMatThrottling(10); + job.setFrequency("1"); + job.setTimeUnitStr("HOUR"); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job); + new CoordMaterializeTransitionXCommand(job.getId(), 3600).call(); + job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId())); + assertEquals(new Date(startTime.getTime() + TIME_IN_HOURS * 10), job.getNextMaterializedTime()); + + // test with hours, time should not pass the current time. + startTime = new Date(new Date().getTime() - TIME_IN_DAY * 3); + endTime = new Date(startTime.getTime() + TIME_IN_DAY * 3); + job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0); + job.setNextMaterializedTime(startTime); + job.setMatThrottling(10); + job.setFrequency("1"); + job.setTimeUnitStr("DAY"); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job); + new CoordMaterializeTransitionXCommand(job.getId(), 3600).call(); + job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId())); + assertEquals(new Date(startTime.getTime() + TIME_IN_DAY ), job.getNextMaterializedTime()); + + // test with hours, time should not pass the current time. + startTime = new Date(new Date().getTime()); + endTime = new Date(startTime.getTime() + TIME_IN_DAY * 3); + job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0); + job.setNextMaterializedTime(startTime); + job.setMatThrottling(10); + job.setFrequency("1"); + job.setTimeUnitStr("DAY"); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job); + new CoordMaterializeTransitionXCommand(job.getId(), 3600).call(); + job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId())); + assertEquals(new Date(startTime.getTime() + TIME_IN_DAY), job.getNextMaterializedTime()); + + // for current job in min, should not exceed hour windows + startTime = new Date(new Date().getTime()); + endTime = new Date(startTime.getTime() + TIME_IN_HOURS * 24); + job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0); + job.setMatThrottling(20); + job.setFrequency("5"); + job.setTimeUnitStr("MINUTE"); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job); + new CoordMaterializeTransitionXCommand(job.getId(), 3600).call(); + job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId())); + assertEquals(new Date(startTime.getTime() + TIME_IN_HOURS), job.getNextMaterializedTime()); + + // for current job in hour, should not exceed hour windows + startTime = new Date(new Date().getTime()); + endTime = new Date(startTime.getTime() + TIME_IN_DAY * 24); + job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0); + job.setMatThrottling(20); + job.setFrequency("1"); + job.setTimeUnitStr("DAY"); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job); + new CoordMaterializeTransitionXCommand(job.getId(), 3600).call(); + job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId())); + assertEquals(new Date(startTime.getTime() + TIME_IN_DAY), job.getNextMaterializedTime()); + + } + protected CoordinatorJobBean addRecordToCoordJobTable(CoordinatorJob.Status status, Date startTime, Date endTime, Date pauseTime, String freq) throws Exception { return addRecordToCoordJobTable(status, startTime, endTime, pauseTime, -1, freq); http://git-wip-us.apache.org/repos/asf/oozie/blob/14599a0a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsToBeMaterializedJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsToBeMaterializedJPAExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsToBeMaterializedJPAExecutor.java index 9c5217f..9e03928 100644 --- a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsToBeMaterializedJPAExecutor.java +++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsToBeMaterializedJPAExecutor.java @@ -118,6 +118,8 @@ public class TestCoordJobsToBeMaterializedJPAExecutor extends XFsTestCase { coordJob.setFrequency("1"); coordJob.setExecutionOrder(Execution.FIFO); coordJob.setConcurrency(1); + coordJob.setMatThrottling(1); + try { coordJob.setStartTime(DateUtils.parseDateOozieTZ("2009-12-15T01:00Z")); coordJob.setEndTime(DateUtils.parseDateOozieTZ("2009-12-17T01:00Z")); http://git-wip-us.apache.org/repos/asf/oozie/blob/14599a0a/core/src/test/java/org/apache/oozie/service/TestCoordMaterializeTriggerService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestCoordMaterializeTriggerService.java b/core/src/test/java/org/apache/oozie/service/TestCoordMaterializeTriggerService.java index 9d4bd3b..9e85da3 100644 --- a/core/src/test/java/org/apache/oozie/service/TestCoordMaterializeTriggerService.java +++ b/core/src/test/java/org/apache/oozie/service/TestCoordMaterializeTriggerService.java @@ -20,6 +20,8 @@ package org.apache.oozie.service; import java.io.IOException; import java.io.Reader; import java.util.Date; +import java.util.List; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.oozie.CoordinatorJobBean; @@ -27,9 +29,12 @@ import org.apache.oozie.client.CoordinatorAction; import org.apache.oozie.client.CoordinatorJob; import org.apache.oozie.client.CoordinatorJob.Execution; import org.apache.oozie.client.CoordinatorJob.Timeunit; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; +import org.apache.oozie.executor.jpa.CoordJobGetActionsJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobGetRunningActionsCountJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; import org.apache.oozie.service.CoordMaterializeTriggerService.CoordMaterializeTriggerRunnable; import org.apache.oozie.service.UUIDService.ApplicationType; @@ -134,6 +139,84 @@ public class TestCoordMaterializeTriggerService extends XDataTestCase { assertEquals(CoordinatorJob.Status.PREP, job3.getStatus()); } + public void testMaxMatThrottleNotPicked() throws Exception { + Services.get().destroy(); + setSystemProperty(CoordMaterializeTriggerService.CONF_MATERIALIZATION_SYSTEM_LIMIT, "10"); + services = new Services(); + services.init(); + + Date start = new Date(); + Date end = new Date(start.getTime() + 3600 * 5 * 1000); + CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false, false, 1); + addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0); + addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0); + job.setMatThrottling(3); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job); + JPAService jpaService = Services.get().get(JPAService.class); + job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId())); + Date lastModifiedDate = job.getLastModifiedTime(); + Runnable runnable = new CoordMaterializeTriggerRunnable(3600, 300); + runnable.run(); + sleep(1000); + job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId())); + assertNotSame(lastModifiedDate, job.getLastModifiedTime()); + + job.setMatThrottling(2); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job); + job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId())); + lastModifiedDate = job.getLastModifiedTime(); + runnable.run(); + sleep(1000); + job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId())); + assertEquals(lastModifiedDate, job.getLastModifiedTime()); + } + + public void testMaxMatThrottleNotPickedMultipleJobs() throws Exception { + Services.get().destroy(); + setSystemProperty(CoordMaterializeTriggerService.CONF_MATERIALIZATION_SYSTEM_LIMIT, "3"); + services = new Services(); + services.init(); + Date start = new Date(); + Date end = new Date(start.getTime() + 3600 * 5 * 1000); + CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false, false, 1); + addRecordToCoordActionTable(job1.getId(), 1, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0); + addRecordToCoordActionTable(job1.getId(), 2, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0); + job1.setMatThrottling(3); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job1); + + CoordinatorJobBean job2 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false, false, 1); + addRecordToCoordActionTable(job2.getId(), 1, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0); + addRecordToCoordActionTable(job2.getId(), 2, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0); + job2.setMatThrottling(3); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job2); + + CoordinatorJobBean job3 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false, false, 1); + addRecordToCoordActionTable(job3.getId(), 1, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0); + addRecordToCoordActionTable(job3.getId(), 2, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0); + job3.setMatThrottling(2); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job3); + + JPAService jpaService = Services.get().get(JPAService.class); + job1 = jpaService.execute(new CoordJobGetJPAExecutor(job1.getId())); + Date lastModifiedDate1 = job1.getLastModifiedTime(); + job2 = jpaService.execute(new CoordJobGetJPAExecutor(job2.getId())); + Date lastModifiedDate2 = job2.getLastModifiedTime(); + job3 = jpaService.execute(new CoordJobGetJPAExecutor(job3.getId())); + Date lastModifiedDate3 = job3.getLastModifiedTime(); + + + Runnable runnable = new CoordMaterializeTriggerRunnable(3600, 300); + runnable.run(); + sleep(1000); + + job1 = jpaService.execute(new CoordJobGetJPAExecutor(job1.getId())); + assertNotSame(lastModifiedDate1, job1.getLastModifiedTime()); + job2 = jpaService.execute(new CoordJobGetJPAExecutor(job2.getId())); + assertNotSame(lastModifiedDate2, job2.getLastModifiedTime()); + job3 = jpaService.execute(new CoordJobGetJPAExecutor(job3.getId())); + assertEquals(lastModifiedDate3, job3.getLastModifiedTime()); + } + @Override protected CoordinatorJobBean createCoordJob(CoordinatorJob.Status status, Date start, Date end, boolean pending, boolean doneMatd, int lastActionNum) throws Exception { Path appPath = new Path(getFsTestCaseDir(), "coord"); http://git-wip-us.apache.org/repos/asf/oozie/blob/14599a0a/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index d2b7e44..0a58977 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.1.0 release (trunk - unreleased) +OOZIE-1527 Fix scalability issues with coordinator materialization (puru via rohini) OOZIE-1797 Workflow rerun command should use existing workflow properties (puru via rohini) OOZIE-1769 An option to update coord properties/definition (puru via rohini) OOZIE-1796 Job status should not transition from KILLED (puru via rohini)
