Repository: oozie Updated Branches: refs/heads/master 5a598039a -> 3f7a0c562
OOZIE-2348 Recovery service keeps on recovering coord action of suspended jobs Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/3f7a0c56 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/3f7a0c56 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/3f7a0c56 Branch: refs/heads/master Commit: 3f7a0c562252e55d16cde15991a00293b1122efc Parents: 5a59803 Author: Purshotam Shah <[email protected]> Authored: Tue Sep 1 14:43:12 2015 -0700 Committer: Purshotam Shah <[email protected]> Committed: Tue Sep 1 14:43:12 2015 -0700 ---------------------------------------------------------------------- .../org/apache/oozie/CoordinatorActionBean.java | 4 +- .../coord/CoordPushDependencyCheckXCommand.java | 14 +++ .../executor/jpa/CoordActionQueryExecutor.java | 29 ++++- .../CoordActionsGetForRecoveryJPAExecutor.java | 125 ------------------- ...dActionsGetReadyGroupbyJobIDJPAExecutor.java | 72 ----------- .../apache/oozie/service/RecoveryService.java | 39 +++--- .../oozie/service/TestRecoveryService.java | 52 ++++++-- release-log.txt | 1 + 8 files changed, 110 insertions(+), 226 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/3f7a0c56/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java index 85b7ed4..c3d4bb4 100644 --- a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java +++ b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java @@ -143,7 +143,7 @@ import org.json.simple.JSONObject; @NamedQuery(name = "GET_RUNNING_ACTIONS_OLDER_THAN", query = "select a.id from CoordinatorActionBean a where a.statusStr = 'RUNNING' AND a.lastModifiedTimestamp <= :lastModifiedTime"), - @NamedQuery(name = "GET_COORD_ACTIONS_WAITING_SUBMITTED_OLDER_THAN", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pushMissingDependencies from CoordinatorActionBean a where (a.statusStr = 'WAITING' OR a.statusStr = 'SUBMITTED') AND a.lastModifiedTimestamp <= :lastModifiedTime"), + @NamedQuery(name = "GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pushMissingDependencies from CoordinatorActionBean a where (a.statusStr = 'WAITING' OR a.statusStr = 'SUBMITTED' OR a.statusStr = 'READY') AND a.lastModifiedTimestamp <= :lastModifiedTime and a.nominalTimestamp <= :currentTime and a.jobId in ( select w.id from CoordinatorJobBean w where w.statusStr = 'RUNNING' or w.statusStr = 'RUNNINGWITHERROR')"), @NamedQuery(name = "GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pending from CoordinatorActionBean a where a.pending > 0 AND (a.statusStr = 'SUSPENDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'RUNNING') AND a.lastModifiedTimestamp <= :lastModifiedTime"), // Select query used by rerun, requires almost all columns so select * is used @@ -161,8 +161,6 @@ import org.json.simple.JSONObject; @NamedQuery(name = "GET_COORD_ACTIONS_MAX_MODIFIED_DATE_FOR_RANGE", query = "select max(w.lastModifiedTimestamp) from CoordinatorActionBean w where w.jobId= :jobId and w.id >= :startAction AND w.id <= :endAction"), - @NamedQuery(name = "GET_READY_ACTIONS_GROUP_BY_JOBID", query = "select a.jobId, min(a.lastModifiedTimestamp) from CoordinatorActionBean a where a.statusStr = 'READY' group by a.jobId having min(a.lastModifiedTimestamp) < :lastModifiedTime"), - @NamedQuery(name = "GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE", query = "select a.id, a.nominalTimestamp, a.createdTimestamp, a.actionXml from CoordinatorActionBean a where a.id in (:ids) and (a.statusStr <> 'FAILED' AND a.statusStr <> 'KILLED' AND a.statusStr <> 'SUCCEEDED' AND a.statusStr <> 'TIMEDOUT' AND a.statusStr <> 'IGNORED')"), @NamedQuery(name = "GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE", query = "select a.id, a.nominalTimestamp, a.createdTimestamp, a.actionXml from CoordinatorActionBean a where a.jobId = :jobId and (a.statusStr <> 'FAILED' AND a.statusStr <> 'KILLED' AND a.statusStr <> 'SUCCEEDED' AND a.statusStr <> 'TIMEDOUT' AND a.statusStr <> 'IGNORED')") http://git-wip-us.apache.org/repos/asf/oozie/blob/3f7a0c56/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java index cc34627..fbb2c6c 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java @@ -100,6 +100,20 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void> @Override protected Void execute() throws CommandException { + // this action should only get processed if current time > nominal time; + // otherwise, requeue this action for delay execution; + Date nominalTime = coordAction.getNominalTime(); + Date currentTime = new Date(); + if (nominalTime.compareTo(currentTime) > 0) { + queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), Math.max((nominalTime.getTime() - currentTime + .getTime()), getCoordPushCheckRequeueInterval())); + updateCoordAction(coordAction, false); + LOG.info("[" + actionId + + "]::CoordPushDependency:: nominal Time is newer than current time, so requeue and wait. Current=" + + DateUtils.formatDateOozieTZ(currentTime) + ", nominal=" + DateUtils.formatDateOozieTZ(nominalTime)); + return null; + } + String pushMissingDeps = coordAction.getPushMissingDependencies(); if (pushMissingDeps == null || pushMissingDeps.length() == 0) { LOG.info("Nothing to check. Empty push missing dependency"); http://git-wip-us.apache.org/repos/asf/oozie/blob/3f7a0c56/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java index c6a60a1..6c7f4be 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java @@ -57,7 +57,9 @@ public class CoordActionQueryExecutor extends GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE, GET_TERMINATED_ACTIONS_FOR_DATES, GET_TERMINATED_ACTION_IDS_FOR_DATES, - GET_ACTIVE_ACTIONS_FOR_DATES + GET_ACTIVE_ACTIONS_FOR_DATES, + GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN, + GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN }; private static CoordActionQueryExecutor instance = new CoordActionQueryExecutor(); @@ -199,6 +201,13 @@ public class CoordActionQueryExecutor extends query.setParameter("startTime", new Timestamp(((Date) parameters[1]).getTime())); query.setParameter("endTime", new Timestamp(((Date) parameters[2]).getTime())); break; + case GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN: + query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime())); + break; + case GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN: + query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime())); + query.setParameter("currentTime", new Timestamp(new Date().getTime())); + break; default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " @@ -293,6 +302,24 @@ public class CoordActionQueryExecutor extends bean.setNominalTime((Timestamp) arr[5]); bean.setCreatedTime((Timestamp) arr[6]); break; + case GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN: + arr = (Object[]) ret; + bean = new CoordinatorActionBean(); + bean.setId((String)arr[0]); + bean.setJobId((String)arr[1]); + bean.setStatusStr((String) arr[2]); + bean.setExternalId((String) arr[3]); + bean.setPending((Integer) arr[4]); + break; + case GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN: + arr = (Object[]) ret; + bean = new CoordinatorActionBean(); + bean.setId((String)arr[0]); + bean.setJobId((String)arr[1]); + bean.setStatusStr((String) arr[2]); + bean.setExternalId((String) arr[3]); + bean.setPushMissingDependenciesBlob((StringBlob) arr[4]); + break; default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for " http://git-wip-us.apache.org/repos/asf/oozie/blob/3f7a0c56/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetForRecoveryJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetForRecoveryJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetForRecoveryJPAExecutor.java deleted file mode 100644 index bba9cc1..0000000 --- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetForRecoveryJPAExecutor.java +++ /dev/null @@ -1,125 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.oozie.executor.jpa; - -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.List; - -import javax.persistence.EntityManager; -import javax.persistence.Query; - -import org.apache.oozie.CoordinatorActionBean; -import org.apache.oozie.ErrorCode; -import org.apache.oozie.StringBlob; -import org.apache.oozie.client.CoordinatorAction; -import org.apache.oozie.util.ParamChecker; - -public class CoordActionsGetForRecoveryJPAExecutor implements JPAExecutor<List<CoordinatorActionBean>> { - - private long checkAgeSecs = 0; - - public CoordActionsGetForRecoveryJPAExecutor(final long checkAgeSecs) { - ParamChecker.notNull(checkAgeSecs, "checkAgeSecs"); - this.checkAgeSecs = checkAgeSecs; - } - - /* (non-Javadoc) - * @see org.apache.oozie.executor.jpa.JPAExecutor#getName() - */ - @Override - public String getName() { - return "CoordActionsGetForRecoveryJPAExecutor"; - } - - /* (non-Javadoc) - * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager) - */ - @SuppressWarnings("unchecked") - @Override - public List<CoordinatorActionBean> execute(EntityManager em) throws JPAExecutorException { - List<CoordinatorActionBean> allActions = new ArrayList<CoordinatorActionBean>(); - - try { - Query q = em.createNamedQuery("GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN"); - Timestamp ts = new Timestamp(System.currentTimeMillis() - this.checkAgeSecs * 1000); - q.setParameter("lastModifiedTime", ts); - List<Object[]> objectArrList = q.getResultList(); - for (Object[] arr : objectArrList) { - CoordinatorActionBean caa = getBeanForCoordinatorActionFromArrayForRecovery(arr); - allActions.add(caa); - } - - q = em.createNamedQuery("GET_COORD_ACTIONS_WAITING_SUBMITTED_OLDER_THAN"); - q.setParameter("lastModifiedTime", ts); - objectArrList = q.getResultList(); - for (Object[] arr : objectArrList) { - CoordinatorActionBean caa = getBeanForCoordinatorActionFromArrayForWaiting(arr); - allActions.add(caa); - } - - return allActions; - } - catch (IllegalStateException e) { - throw new JPAExecutorException(ErrorCode.E0601, e.getMessage(), e); - } - } - - private CoordinatorActionBean getBeanForCoordinatorActionFromArrayForRecovery(Object[] arr) { - CoordinatorActionBean bean = new CoordinatorActionBean(); - if (arr[0] != null) { - bean.setId((String) arr[0]); - } - if (arr[1] != null){ - bean.setJobId((String) arr[1]); - } - if (arr[2] != null) { - bean.setStatus(CoordinatorAction.Status.valueOf((String) arr[2])); - } - if (arr[3] != null) { - bean.setExternalId((String) arr[3]); - } - if (arr[4] != null) { - bean.setPending((Integer) arr[4]); - } - return bean; - } - - - private CoordinatorActionBean getBeanForCoordinatorActionFromArrayForWaiting(Object[] arr){ - CoordinatorActionBean bean = new CoordinatorActionBean(); - if (arr[0] != null) { - bean.setId((String) arr[0]); - } - if (arr[1] != null){ - bean.setJobId((String) arr[1]); - } - if (arr[2] != null) { - bean.setStatus(CoordinatorAction.Status.valueOf((String) arr[2])); - } - if (arr[3] != null) { - bean.setExternalId((String) arr[3]); - } - if (arr[4] != null) { - bean.setPushMissingDependenciesBlob((StringBlob) arr[4]); - } - return bean; - } - -} http://git-wip-us.apache.org/repos/asf/oozie/blob/3f7a0c56/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetReadyGroupbyJobIDJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetReadyGroupbyJobIDJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetReadyGroupbyJobIDJPAExecutor.java deleted file mode 100644 index 3a85e5c..0000000 --- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetReadyGroupbyJobIDJPAExecutor.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.oozie.executor.jpa; - -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.List; - -import javax.persistence.EntityManager; -import javax.persistence.Query; - -import org.apache.oozie.ErrorCode; -import org.apache.oozie.util.ParamChecker; - -public class CoordActionsGetReadyGroupbyJobIDJPAExecutor implements JPAExecutor<List<String>>{ - private long checkAgeSecs = 0; - - public CoordActionsGetReadyGroupbyJobIDJPAExecutor(final long checkAgeSecs) { - ParamChecker.notNull(checkAgeSecs, "checkAgeSecs"); - this.checkAgeSecs = checkAgeSecs; - } - - /* (non-Javadoc) - * @see org.apache.oozie.executor.jpa.JPAExecutor#getName() - */ - @Override - public String getName() { - return "CoordActionsGetReadyGroupbyJobIDJPAExecutor"; - } - - /* (non-Javadoc) - * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager) - */ - @Override - public List<String> execute(EntityManager em) throws JPAExecutorException { - List<String> jobids = new ArrayList<String>(); - try { - Query q = em.createNamedQuery("GET_READY_ACTIONS_GROUP_BY_JOBID"); - Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000); - q.setParameter("lastModifiedTime", ts); - List<Object[]> list = q.getResultList(); - - for (Object[] arr : list) { - if (arr != null && arr[0] != null) { - jobids.add((String) arr[0]); - } - } - - return jobids; - } - catch (IllegalStateException e) { - throw new JPAExecutorException(ErrorCode.E0601, e.getMessage(), e); - } - } - -} http://git-wip-us.apache.org/repos/asf/oozie/blob/3f7a0c56/core/src/main/java/org/apache/oozie/service/RecoveryService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/RecoveryService.java b/core/src/main/java/org/apache/oozie/service/RecoveryService.java index 4b4a3f2..49f47d0 100644 --- a/core/src/main/java/org/apache/oozie/service/RecoveryService.java +++ b/core/src/main/java/org/apache/oozie/service/RecoveryService.java @@ -20,9 +20,12 @@ package org.apache.oozie.service; import java.io.IOException; import java.io.StringReader; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.Date; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.BundleActionBean; @@ -50,8 +53,8 @@ import org.apache.oozie.command.wf.SignalXCommand; import org.apache.oozie.command.wf.SuspendXCommand; import org.apache.oozie.executor.jpa.BundleActionQueryExecutor; import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; -import org.apache.oozie.executor.jpa.CoordActionsGetForRecoveryJPAExecutor; -import org.apache.oozie.executor.jpa.CoordActionsGetReadyGroupbyJobIDJPAExecutor; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery; import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery; @@ -145,7 +148,6 @@ public class RecoveryService implements Service { jpaService = Services.get().get(JPAService.class); runWFRecovery(); runCoordActionRecovery(); - runCoordActionRecoveryForReady(); runBundleRecovery(); log.debug("QUEUING [{0}] for potential recovery", msg.toString()); boolean ret = false; @@ -242,13 +244,20 @@ public class RecoveryService implements Service { * Recover coordinator actions that are staying in WAITING or SUBMITTED too long */ private void runCoordActionRecovery() { + Set<String> readyJobs = new HashSet<String>(); XLog.Info.get().clear(); XLog log = XLog.getLog(getClass()); long pushMissingDepInterval = ConfigurationService.getLong(CONF_PUSH_DEPENDENCY_INTERVAL); long pushMissingDepDelay = pushMissingDepInterval; - List<CoordinatorActionBean> cactions = null; + Timestamp ts = new Timestamp(System.currentTimeMillis() - this.coordOlderThan * 1000); + + List<CoordinatorActionBean> cactions = new ArrayList<CoordinatorActionBean>(); try { - cactions = jpaService.execute(new CoordActionsGetForRecoveryJPAExecutor(coordOlderThan)); + cactions.addAll(CoordActionQueryExecutor.getInstance().getList( + CoordActionQuery.GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN, ts)); + cactions.addAll(CoordActionQueryExecutor.getInstance().getList( + CoordActionQuery.GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN, ts)); + } catch (JPAExecutorException ex) { log.warn("Error reading coord actions from database", ex); @@ -301,30 +310,30 @@ public class RecoveryService implements Service { log.debug("Recover a RUNNING coord action and resubmit ResumeXCommand :" + caction.getId()); } } + else if (caction.getStatus() == CoordinatorActionBean.Status.READY) { + readyJobs.add(caction.getJobId()); + } } } catch (Exception ex) { log.error("Exception, {0}", ex.getMessage(), ex); } } - - + runCoordActionRecoveryForReady(readyJobs); } /** * Recover coordinator actions that are staying in READY too long */ - private void runCoordActionRecoveryForReady() { + private void runCoordActionRecoveryForReady(Set<String> jobIds) { XLog.Info.get().clear(); XLog log = XLog.getLog(getClass()); - + List<String> coordJobIds = new ArrayList<String>(jobIds); try { - List<String> jobids = jpaService.execute(new CoordActionsGetReadyGroupbyJobIDJPAExecutor(coordOlderThan)); - jobids = Services.get().get(JobsConcurrencyService.class).getJobIdsForThisServer(jobids); - msg.append(", COORD_READY_JOBS : " + jobids.size()); - for (String jobid : jobids) { - queueCallable(new CoordActionReadyXCommand(jobid)); - + coordJobIds = Services.get().get(JobsConcurrencyService.class).getJobIdsForThisServer(coordJobIds); + msg.append(", COORD_READY_JOBS : " + coordJobIds.size()); + for (String jobid : coordJobIds) { + queueCallable(new CoordActionReadyXCommand(jobid)); log.info("Recover READY coord actions for jobid :" + jobid); } } http://git-wip-us.apache.org/repos/asf/oozie/blob/3f7a0c56/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java index c6ecd76..13d8e8d 100644 --- a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java +++ b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java @@ -52,6 +52,8 @@ import org.apache.oozie.dependency.HCatURIHandler; import org.apache.oozie.executor.jpa.BundleActionGetJPAExecutor; import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor; import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor; @@ -383,9 +385,27 @@ public class TestRecoveryService extends XDataTestCase { CoordinatorJobBean job = addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml", CoordinatorJob.Status.RUNNING, false, true); + CoordinatorJobBean jobWithError = addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml", + CoordinatorJob.Status.RUNNINGWITHERROR, false, true); + + CoordinatorJobBean suspendedJob = addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml", + CoordinatorJob.Status.SUSPENDED, false, true); + CoordinatorActionBean action = addRecordToCoordActionTableForWaiting(job.getId(), 1, CoordinatorAction.Status.WAITING, "coord-action-for-action-input-check.xml"); + CoordinatorActionBean actionReady = addRecordToCoordActionTableForWaiting(job.getId(), 2, + CoordinatorAction.Status.READY, "coord-action-for-action-input-check.xml"); + + CoordinatorActionBean suspendedAction = addRecordToCoordActionTableForWaiting(suspendedJob.getId(), 1, + CoordinatorAction.Status.WAITING, "coord-action-for-action-input-check.xml"); + + CoordinatorActionBean runningWithErrorAction = addRecordToCoordActionTableForWaiting(jobWithError.getId(), 1, + CoordinatorAction.Status.WAITING, "coord-action-for-action-input-check.xml"); + + CoordinatorActionBean submittedAction = addRecordToCoordActionTableForWaiting(suspendedJob.getId(), 2, + CoordinatorAction.Status.SUBMITTED, "coord-action-for-action-input-check.xml"); + createDir(new File(getTestCaseDir(), "/2009/29/")); createDir(new File(getTestCaseDir(), "/2009/22/")); createDir(new File(getTestCaseDir(), "/2009/15/")); @@ -397,24 +417,36 @@ public class TestRecoveryService extends XDataTestCase { recoveryRunnable.run(); final String actionId = action.getId(); - final JPAService jpaService = Services.get().get(JPAService.class); - assertNotNull(jpaService); waitFor(10000, new Predicate() { public boolean evaluate() throws Exception { - CoordActionGetJPAExecutor coordGetCmd = new CoordActionGetJPAExecutor(actionId); - CoordinatorActionBean newAction = jpaService.execute(coordGetCmd); + CoordinatorActionBean newAction = CoordActionQueryExecutor.getInstance().get( + CoordActionQuery.GET_COORD_ACTION, actionId); return (newAction.getStatus() != CoordinatorAction.Status.WAITING); } }); - CoordActionGetJPAExecutor coordGetCmd = new CoordActionGetJPAExecutor(actionId); - action = jpaService.execute(coordGetCmd); - if (action.getStatus() == CoordinatorAction.Status.WAITING) { - fail("recovery waiting coord action failed, action is WAITING"); - } - } + action = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION, actionId); + // action status should change from waiting + assertFalse(action.getStatus().equals(CoordinatorAction.Status.WAITING)); + // action status should change from waiting + assertFalse(CoordActionQueryExecutor.getInstance() + .get(CoordActionQuery.GET_COORD_ACTION, runningWithErrorAction.getId()).getStatus() + .equals(CoordinatorAction.Status.WAITING)); + // action status should change from waiting + assertFalse(CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION, actionReady.getId()) + .getStatus().equals(CoordinatorAction.Status.READY)); + assertTrue(CoordActionQueryExecutor.getInstance() + + // action status should remain to waiting bcz job is suspended + .get(CoordActionQuery.GET_COORD_ACTION, suspendedAction.getId()).getStatus() + .equals(CoordinatorAction.Status.WAITING)); + // action status should remain to submitted bcz job is suspended + assertEquals( + CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION, submittedAction.getId()) + .getStatus(), (CoordinatorAction.Status.SUBMITTED)); + } public void testCoordActionRecoveryServiceForWaitingRegisterPartition() throws Exception { services.destroy(); http://git-wip-us.apache.org/repos/asf/oozie/blob/3f7a0c56/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index a7c180b..7174312 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.3.0 release (trunk - unreleased) +OOZIE-2348 Recovery service keeps on recovering coord action of suspended jobs (puru) OOZIE-2277 Honor oozie.action.sharelib.for.spark in Spark jobs (rkanter) OOZIE-2322 Oozie Web UI doesn't work with Kerberos in Internet Explorer 10 or 11 and curl (rkanter) OOZIE-2343 Shell Action should take Oozie Action config and setup HADOOP_CONF_DIR (rkanter)
