Repository: oozie Updated Branches: refs/heads/master 5ab4c2515 -> b5a4e06ba
OOZIE-2127 Add created time to RecoveryService WF queries Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/b5a4e06b Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/b5a4e06b Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/b5a4e06b Branch: refs/heads/master Commit: b5a4e06ba0ad5e0ac63bd876223182cfbd2a219c Parents: 5ab4c25 Author: Purshotam Shah <[email protected]> Authored: Wed Feb 4 12:14:24 2015 -0800 Committer: Purshotam Shah <[email protected]> Committed: Wed Feb 4 12:14:24 2015 -0800 ---------------------------------------------------------------------- .../org/apache/oozie/WorkflowActionBean.java | 2 +- .../jpa/WorkflowActionQueryExecutor.java | 2 ++ .../apache/oozie/service/RecoveryService.java | 12 ++++++- core/src/main/resources/oozie-default.xml | 8 +++++ .../oozie/service/TestRecoveryService.java | 33 ++++++++++++++------ release-log.txt | 1 + 6 files changed, 47 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/b5a4e06b/core/src/main/java/org/apache/oozie/WorkflowActionBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/WorkflowActionBean.java b/core/src/main/java/org/apache/oozie/WorkflowActionBean.java index 06edf53..a6cf74a 100644 --- a/core/src/main/java/org/apache/oozie/WorkflowActionBean.java +++ b/core/src/main/java/org/apache/oozie/WorkflowActionBean.java @@ -104,7 +104,7 @@ import org.json.simple.JSONObject; @NamedQuery(name = "GET_ACTIONS_OF_WORKFLOW_FOR_UPDATE", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp"), - @NamedQuery(name = "GET_PENDING_ACTIONS", query = "select a.id, a.wfId, a.statusStr, a.type, a.pendingAgeTimestamp from WorkflowActionBean a where a.pending = 1 AND a.pendingAgeTimestamp < :pendingAge AND a.statusStr <> 'RUNNING'"), + @NamedQuery(name = "GET_PENDING_ACTIONS", query = "select a.id, a.wfId, a.statusStr, a.type, a.pendingAgeTimestamp from WorkflowActionBean a where a.pending = 1 AND a.pendingAgeTimestamp < :pendingAge AND a.statusStr <> 'RUNNING' AND a.createdTimeTS >= :createdTime"), @NamedQuery(name = "GET_RUNNING_ACTIONS", query = "select a.id from WorkflowActionBean a where a.pending = 1 AND a.statusStr = 'RUNNING' AND a.lastCheckTimestamp < :lastCheckTime"), http://git-wip-us.apache.org/repos/asf/oozie/blob/b5a4e06b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java index 2c459e4..0e99ae2 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java @@ -209,7 +209,9 @@ public class WorkflowActionQueryExecutor extends case GET_PENDING_ACTIONS: Long minimumPendingAgeSecs = (Long) parameters[0]; Timestamp pts = new Timestamp(System.currentTimeMillis() - minimumPendingAgeSecs * 1000); + Timestamp createdTimeInterval = new Timestamp((Long) parameters[1]); query.setParameter("pendingAge", pts); + query.setParameter("createdTime", createdTimeInterval); break; case GET_ACTIONS_FOR_WORKFLOW_RERUN: query.setParameter("wfId", parameters[0]); http://git-wip-us.apache.org/repos/asf/oozie/blob/b5a4e06b/core/src/main/java/org/apache/oozie/service/RecoveryService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/RecoveryService.java b/core/src/main/java/org/apache/oozie/service/RecoveryService.java index 9f31e88..4b4a3f2 100644 --- a/core/src/main/java/org/apache/oozie/service/RecoveryService.java +++ b/core/src/main/java/org/apache/oozie/service/RecoveryService.java @@ -96,6 +96,9 @@ public class RecoveryService implements Service { * Age of actions to queue, in seconds. */ public static final String CONF_WF_ACTIONS_OLDER_THAN = CONF_PREFIX_WF_ACTIONS + "older.than"; + + public static final String CONF_WF_ACTIONS_CREATED_TIME_INTERVAL = CONF_PREFIX_WF_ACTIONS + "created.time.interval"; + /** * Age of coordinator jobs to recover, in seconds. */ @@ -111,6 +114,9 @@ public class RecoveryService implements Service { private static final String INSTR_RECOVERED_COORD_ACTIONS_COUNTER = "coord_actions"; private static final String INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER = "bundle_actions"; + public static final long ONE_DAY_MILLISCONDS = 25 * 60 * 60 * 1000; + + /** * RecoveryRunnable is the Runnable which is scheduled to run with the configured interval, and takes care of the @@ -334,10 +340,14 @@ public class RecoveryService implements Service { XLog.Info.get().clear(); XLog log = XLog.getLog(getClass()); // queue command for action recovery + + long createdTimeInterval = new Date().getTime() - ConfigurationService.getLong(CONF_WF_ACTIONS_CREATED_TIME_INTERVAL) + * ONE_DAY_MILLISCONDS; + List<WorkflowActionBean> actions = null; try { actions = WorkflowActionQueryExecutor.getInstance().getList(WorkflowActionQuery.GET_PENDING_ACTIONS, - olderThan); + olderThan, createdTimeInterval); } catch (JPAExecutorException ex) { log.warn("Exception while reading pending actions from storage", ex); http://git-wip-us.apache.org/repos/asf/oozie/blob/b5a4e06b/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index fcc73b8..17da11b 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -333,6 +333,14 @@ </property> <property> + <name>oozie.service.RecoveryService.wf.actions.created.time.interval</name> + <value>7</value> + <description> + Created time period of the actions which are eligible to be queued for recovery in days. + </description> + </property> + + <property> <name>oozie.service.RecoveryService.callable.batch.size</name> <value>10</value> <description> http://git-wip-us.apache.org/repos/asf/oozie/blob/b5a4e06b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java index 62d14a0..9f8e65f 100644 --- a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java +++ b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java @@ -56,7 +56,9 @@ import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor; import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor; +import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; +import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; import org.apache.oozie.service.RecoveryService.RecoveryRunnable; import org.apache.oozie.store.CoordinatorStore; import org.apache.oozie.store.StoreException; @@ -210,14 +212,21 @@ public class TestRecoveryService extends XDataTestCase { */ public void testWorkflowActionRecoveryUserRetry() throws Exception { final JPAService jpaService = Services.get().get(JPAService.class); - WorkflowJobBean job = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); - WorkflowActionBean action = this.addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.USER_RETRY); + WorkflowJobBean job1 = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); + WorkflowActionBean action1 = this.addRecordToWfActionTable(job1.getId(), "1", WorkflowAction.Status.USER_RETRY); + + WorkflowJobBean job2 = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); + WorkflowActionBean action2 = createWorkflowActionSetPending(job2.getId(), WorkflowAction.Status.USER_RETRY); + //Default recovery created time is 7 days. + action2.setCreatedTime(new Date(new Date().getTime() - 8 * RecoveryService.ONE_DAY_MILLISCONDS)); + WorkflowActionInsertJPAExecutor actionInsertCmd = new WorkflowActionInsertJPAExecutor(action2); + jpaService.execute(actionInsertCmd); Runnable recoveryRunnable = new RecoveryRunnable(0, 60, 60); recoveryRunnable.run(); sleep(3000); - final WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(action.getId()); + final WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(action1.getId()); waitFor(5000, new Predicate() { public boolean evaluate() throws Exception { @@ -225,18 +234,23 @@ public class TestRecoveryService extends XDataTestCase { return a.getExternalId() != null; } }); - action = jpaService.execute(wfActionGetCmd); - assertNotNull(action.getExternalId()); - assertEquals(WorkflowAction.Status.RUNNING, action.getStatus()); + action1 = jpaService.execute(wfActionGetCmd); + assertNotNull(action1.getExternalId()); + assertEquals(WorkflowAction.Status.RUNNING, action1.getStatus()); + + //Action 2 should not get recover as it's created time is older then 7 days + action2= WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION, action2.getId()); + assertNull(action2.getExternalId()); + assertEquals(WorkflowAction.Status.USER_RETRY, action2.getStatus()); - ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job, action, false, false); + ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job1, action1, false, false); MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor(); - JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action.getConf())); + JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action1.getConf())); String user = conf.get("user.name"); String group = conf.get("group.name"); JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); - String launcherId = action.getExternalId(); + String launcherId = action1.getExternalId(); final RunningJob launcherJob = jobClient.getJob(JobID.forName(launcherId)); @@ -854,6 +868,7 @@ public class TestRecoveryService extends XDataTestCase { action.setType("map-reduce"); action.setTransition("transition"); action.setStatus(status); + action.setCreatedTime(new Date()); action.setStartTime(new Date()); action.setEndTime(new Date()); action.setLastCheckTime(new Date()); http://git-wip-us.apache.org/repos/asf/oozie/blob/b5a4e06b/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 2db4bb3..2126234 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.2.0 release (trunk - unreleased) +OOZIE-2127 Add created time to RecoveryService WF queries (puru) OOZIE-2123 Disable launcher uber mode if classloader options are set (ryota) OOZIE-2118 add createdtime option to workflow jobs query (ryota) OOZIE-2110 cancel delegation token of launcher jobs that stay till child jobs finish (ryota)
