http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/main/java/org/apache/oozie/service/StatusTransitService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/StatusTransitService.java b/core/src/main/java/org/apache/oozie/service/StatusTransitService.java index 0d549a0..77dcda9 100644 --- a/core/src/main/java/org/apache/oozie/service/StatusTransitService.java +++ b/core/src/main/java/org/apache/oozie/service/StatusTransitService.java @@ -18,42 +18,27 @@ package org.apache.oozie.service; -import java.util.ArrayList; import java.util.Date; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.TreeSet; -import java.util.Comparator; import org.apache.hadoop.conf.Configuration; -import org.apache.oozie.BundleActionBean; import org.apache.oozie.BundleJobBean; -import org.apache.oozie.CoordinatorActionBean; import org.apache.oozie.CoordinatorJobBean; import org.apache.oozie.ErrorCode; -import org.apache.oozie.client.CoordinatorAction; -import org.apache.oozie.client.Job; import org.apache.oozie.command.CommandException; -import org.apache.oozie.command.bundle.BundleKillXCommand; -import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; +import org.apache.oozie.command.bundle.BundleStatusTransitXCommand; +import org.apache.oozie.command.coord.CoordStatusTransitXCommand; import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; -import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery; import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery; -import org.apache.oozie.executor.jpa.BundleActionQueryExecutor; import org.apache.oozie.executor.jpa.BundleJobsGetRunningOrPendingJPAExecutor; -import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; -import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; -import org.apache.oozie.executor.jpa.CoordJobGetActionsStatusJPAExecutor; -import org.apache.oozie.executor.jpa.CoordJobGetPendingActionsCountJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; import org.apache.oozie.executor.jpa.CoordJobsGetPendingJPAExecutor; import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.util.DateUtils; import org.apache.oozie.lock.LockToken; -import org.apache.oozie.util.StatusUtils; import org.apache.oozie.util.XLog; /** @@ -64,13 +49,15 @@ import org.apache.oozie.util.XLog; * SUCCEEDED. */ public class StatusTransitService implements Service { - public static final String CONF_PREFIX = Service.CONF_PREFIX + "StatusTransitService."; - public static final String CONF_STATUSTRANSIT_INTERVAL = CONF_PREFIX + "statusTransit.interval"; - public static final String CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS = CONF_PREFIX + "backward.support.for.coord.status"; - public static final String CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR = CONF_PREFIX + "backward.support.for.states.without.error"; - private static int limit = -1; - private static Date lastInstanceStartTime = null; - private final static XLog LOG = XLog.getLog(StatusTransitRunnable.class); + private static final String CONF_PREFIX = Service.CONF_PREFIX + "StatusTransitService."; + private static final String CONF_STATUSTRANSIT_INTERVAL = CONF_PREFIX + "statusTransit.interval"; + public static final String CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS = CONF_PREFIX + + "backward.support.for.coord.status"; + public static final String CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR = CONF_PREFIX + + "backward.support.for.states.without.error"; + public static int limit = -1; + public static Date lastInstanceStartTime = null; + public final static XLog LOG = XLog.getLog(StatusTransitRunnable.class); /** * StateTransitRunnable is the runnable which is scheduled to run at the configured interval. @@ -83,6 +70,9 @@ public class StatusTransitService implements Service { private JPAService jpaService = null; private LockToken lock; + private Set<String> coordFailedIds = new HashSet<String>(); + private Set<String> bundleFailedIds = new HashSet<String>(); + public StatusTransitRunnable() { jpaService = Services.get().get(JPAService.class); if (jpaService == null) { @@ -93,22 +83,19 @@ public class StatusTransitService implements Service { @Override public void run() { try { - Date curDate = new Date(); // records the start time of this service run; + final Date curDate = new Date(); // records the start time of this service run; // first check if there is some other instance running; - lock = Services.get().get(MemoryLocksService.class).getWriteLock(StatusTransitService.class.getName(), - lockTimeout); + lock = Services.get().get(MemoryLocksService.class) + .getWriteLock(StatusTransitService.class.getName(), lockTimeout); if (lock == null) { LOG.info("This StatusTransitService instance" + " will not run since there is already an instance running"); } else { LOG.info("Acquired lock for [{0}]", StatusTransitService.class.getName()); - // running coord jobs transit service coordTransit(); - // running bundle jobs transit service bundleTransit(); - lastInstanceStartTime = curDate; } } @@ -116,7 +103,6 @@ public class StatusTransitService implements Service { LOG.warn("Exception happened during StatusTransitRunnable ", ex); } finally { - // release lock; if (lock != null) { lock.release(); LOG.info("Released lock for [{0}]", StatusTransitService.class.getName()); @@ -124,22 +110,6 @@ public class StatusTransitService implements Service { } } - public List<BundleJobBean> removeDuplicates(List<BundleJobBean> pendingJobList) { - Set<BundleJobBean> s = new TreeSet<BundleJobBean>(new Comparator<BundleJobBean>() { - @Override - public int compare(BundleJobBean b1, BundleJobBean b2) { - if (b1.getId().equals(b2.getId())) { - return 0; - } - else { - return 1; - } - } - }); - s.addAll(pendingJobList); - return new ArrayList<BundleJobBean>(s); - } - /** * Aggregate bundle actions' status to bundle jobs * @@ -147,11 +117,13 @@ public class StatusTransitService implements Service { * @throws CommandException thrown if failed to run commands */ private void bundleTransit() throws JPAExecutorException, CommandException { - List<BundleJobBean> pendingJobCheckList = null; + List<BundleJobBean> pendingJobCheckList; + final Set<String> bundleIds = new HashSet<String>(); if (lastInstanceStartTime == null) { LOG.info("Running bundle status service first instance"); - // this is the first instance, we need to check for all pending or running jobs; + // This is the first instance, we need to check for all pending or running jobs; + // TODO currently limit is = -1. Need to do actual batching pendingJobCheckList = jpaService.execute(new BundleJobsGetRunningOrPendingJPAExecutor(limit)); } else { @@ -159,556 +131,30 @@ public class StatusTransitService implements Service { + DateUtils.formatDateOozieTZ(lastInstanceStartTime)); // this is not the first instance, we should only check jobs that have actions been // updated >= start time of last service run; - List<BundleActionBean> actionList = BundleActionQueryExecutor.getInstance().getList( - BundleActionQuery.GET_BUNDLE_ACTIONS_BY_LAST_MODIFIED_TIME, lastInstanceStartTime); - Set<String> bundleIds = new HashSet<String>(); - for (BundleActionBean action : actionList) { - bundleIds.add(action.getBundleId()); - } - pendingJobCheckList = new ArrayList<BundleJobBean>(); - for (String bundleId : bundleIds.toArray(new String[bundleIds.size()])) { - BundleJobBean bundle = BundleJobQueryExecutor.getInstance().get( - BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME, bundleId); - // Running bundle job might have pending false - if (bundle.isPending() || bundle.getStatus().equals(Job.Status.RUNNING) - || bundle.getStatus().equals(Job.Status.RUNNINGWITHERROR) - || bundle.getStatus().equals(Job.Status.PAUSED) - || bundle.getStatus().equals(Job.Status.PAUSEDWITHERROR)) { - pendingJobCheckList.add(bundle); - } - } - } - aggregateBundleJobsStatus(pendingJobCheckList); - } - - private void aggregateBundleJobsStatus(List<BundleJobBean> bundleLists) throws JPAExecutorException, - CommandException { - if (bundleLists != null) { - for (BundleJobBean bundleJob : bundleLists) { - try { - String jobId = bundleJob.getId(); - Job.Status[] bundleStatus = new Job.Status[1]; - bundleStatus[0] = bundleJob.getStatus(); - List<BundleActionBean> bundleActions = BundleActionQueryExecutor.getInstance().getList( - BundleActionQuery.GET_BUNDLE_UNIGNORED_ACTION_STATUS_PENDING_FOR_BUNDLE, jobId); - HashMap<Job.Status, Integer> bundleActionStatus = new HashMap<Job.Status, Integer>(); - boolean foundPending = false; - for (BundleActionBean bAction : bundleActions) { - int counter = 0; - if (bundleActionStatus.containsKey(bAction.getStatus())) { - counter = bundleActionStatus.get(bAction.getStatus()) + 1; - } - else { - ++counter; - } - bundleActionStatus.put(bAction.getStatus(), counter); - if (bAction.getCoordId() == null - && (bAction.getStatus() == Job.Status.FAILED || bAction.getStatus() == Job.Status.KILLED)) { - (new BundleKillXCommand(jobId)).call(); - LOG.info("Bundle job ["+ jobId - + "] has been killed since one of its coordinator job failed submission."); - } - - if (bAction.isPending()) { - foundPending = true; - } - } - - if (!foundPending && checkTerminalStatus(bundleActionStatus, bundleActions, bundleStatus)) { - LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() - + "' from '" + bundleJob.getStatus() + "'"); - updateBundleJob(foundPending, bundleJob, bundleStatus[0]); - } - else if (!foundPending && checkPrepStatus(bundleActionStatus, bundleActions, bundleStatus)) { - LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() - + "' from '" + bundleJob.getStatus() + "'"); - updateBundleJob(foundPending, bundleJob, bundleStatus[0]); - } - else if (checkPausedStatus(bundleActionStatus, bundleActions, bundleStatus)) { - LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() - + "' from '" + bundleJob.getStatus() + "'"); - updateBundleJob(foundPending, bundleJob, bundleStatus[0]); - } - else if (checkSuspendStatus(bundleActionStatus, bundleActions, bundleStatus, foundPending)) { - LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() - + "' from '" + bundleJob.getStatus() + "'"); - updateBundleJob(foundPending, bundleJob, bundleStatus[0]); - } - else if (checkRunningStatus(bundleActionStatus, bundleActions, bundleStatus)) { - LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() - + "' from '" + bundleJob.getStatus() + "'"); - updateBundleJob(foundPending, bundleJob, bundleStatus[0]); - } - } - catch (Exception ex) { - LOG.error("Exception happened during aggregate bundle job's status, job = " - + bundleJob.getId(), ex); - } + pendingJobCheckList = BundleJobQueryExecutor.getInstance().getList( + BundleJobQuery.GET_BUNDLE_IDS_FOR_STATUS_TRANSIT, lastInstanceStartTime); + } + for (BundleJobBean job : pendingJobCheckList) { + bundleIds.add(job.getId()); + } + bundleIds.addAll(bundleFailedIds); + bundleFailedIds.clear(); + for (final String jobId : bundleIds) { + try { + new BundleStatusTransitXCommand(jobId).call(); + } + catch (CommandException e) { + // Unable to acquire lock. Will try next time + if (e.getErrorCode() == ErrorCode.E0606) { + bundleFailedIds.add(jobId); + LOG.info("Unable to acquire lock for " + jobId + ". Will try next time"); } - } - - } - - private void aggregateCoordJobsStatus(List<CoordinatorJobBean> CoordList) throws JPAExecutorException, - CommandException { - if (CoordList != null) { - Configuration conf = Services.get().getConf(); - boolean backwardSupportForCoordStatus = conf.getBoolean(CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false); - - for (CoordinatorJobBean coordJob : CoordList) { - try { - // if namespace 0.1 is used and backward support is true, then ignore this coord job - if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null - && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) { - continue; - } - String jobId = coordJob.getId(); - Job.Status[] coordStatus = new Job.Status[1]; - coordStatus[0] = coordJob.getStatus(); - //Get count of Coordinator actions with pending true - boolean isPending = false; - int count = jpaService.execute(new CoordJobGetPendingActionsCountJPAExecutor(jobId)); - if (count > 0) { - isPending = true; - } - // Get status of Coordinator actions - List<CoordinatorAction.Status> coordActionStatusList = jpaService - .execute(new CoordJobGetActionsStatusJPAExecutor(jobId)); - HashMap<CoordinatorAction.Status, Integer> coordActionStatus = new HashMap<CoordinatorAction.Status, Integer>(); - - for (CoordinatorAction.Status status : coordActionStatusList) { - int counter = 0; - if (coordActionStatus.containsKey(status)) { - counter = coordActionStatus.get(status) + 1; - } - else { - ++counter; - } - coordActionStatus.put(status, counter); - } - - int nonPendingCoordActionsCount = coordActionStatusList.size(); - boolean isDoneMaterialization = coordJob.isDoneMaterialization(); - if ((isDoneMaterialization || coordStatus[0] == Job.Status.FAILED || coordStatus[0] == Job.Status.KILLED) - && checkCoordTerminalStatus(coordActionStatus, nonPendingCoordActionsCount, - coordStatus, isDoneMaterialization)) { - updateCoordJob(isPending, coordJob, coordStatus[0]); - } - else if (checkCoordPausedStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) { - updateCoordJob(isPending, coordJob, coordStatus[0]); - } - else if(checkCoordSuspendStatus( coordActionStatus, nonPendingCoordActionsCount, coordStatus, coordJob.isDoneMaterialization(), isPending)) { - updateCoordJob(isPending, coordJob, coordStatus[0]); - } - else if (checkCoordRunningStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) { - updateCoordJob(isPending, coordJob, coordStatus[0]); - } - else { - checkCoordPending(isPending, coordJob, true); - } - } - catch (Exception ex) { - LOG.error("Exception happened during aggregate coordinator job's status, job = " - + coordJob.getId(), ex); + else { + LOG.error("Error running BundleStatusTransitXCommand for job " + jobId, e); } - } - - } - } - - private boolean checkTerminalStatus(HashMap<Job.Status, Integer> bundleActionStatus, - List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) { - boolean ret = false; - int totalValuesSucceed = 0; - if (bundleActionStatus.containsKey(Job.Status.SUCCEEDED)) { - totalValuesSucceed = bundleActionStatus.get(Job.Status.SUCCEEDED); - } - int totalValuesFailed = 0; - if (bundleActionStatus.containsKey(Job.Status.FAILED)) { - totalValuesFailed = bundleActionStatus.get(Job.Status.FAILED); - } - int totalValuesKilled = 0; - if (bundleActionStatus.containsKey(Job.Status.KILLED)) { - totalValuesKilled = bundleActionStatus.get(Job.Status.KILLED); - } - - int totalValuesDoneWithError = 0; - if (bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)) { - totalValuesDoneWithError = bundleActionStatus.get(Job.Status.DONEWITHERROR); - } - - if (bundleActions.size() == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesDoneWithError)) { - // If all bundle action is done and bundle is killed, then don't change the status. - if (bundleStatus[0].equals(Job.Status.KILLED)) { - bundleStatus[0] = Job.Status.KILLED; - return true; - } - // If all the bundle actions are succeeded then bundle job should be succeeded. - if (bundleActions.size() == totalValuesSucceed) { - bundleStatus[0] = Job.Status.SUCCEEDED; - ret = true; - } - else if (bundleActions.size() == totalValuesKilled) { - // If all the bundle actions are KILLED then bundle job should be KILLED. - bundleStatus[0] = Job.Status.KILLED; - ret = true; - } - else if (bundleActions.size() == totalValuesFailed) { - // If all the bundle actions are FAILED then bundle job should be FAILED. - bundleStatus[0] = Job.Status.FAILED; - ret = true; - } - else { - bundleStatus[0] = Job.Status.DONEWITHERROR; - ret = true; - } - } - return ret; - } - - private boolean checkCoordTerminalStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus, - int coordActionsCount, Job.Status[] coordStatus, boolean isDoneMaterialization) { - boolean ret = false; - int totalValuesSucceed = 0; - if (coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED)) { - totalValuesSucceed = coordActionStatus.get(CoordinatorAction.Status.SUCCEEDED); - } - int totalValuesFailed = 0; - if (coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)) { - totalValuesFailed = coordActionStatus.get(CoordinatorAction.Status.FAILED); - } - int totalValuesKilled = 0; - if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)) { - totalValuesKilled = coordActionStatus.get(CoordinatorAction.Status.KILLED); - } - int totalValuesTimeOut = 0; - if (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) { - totalValuesTimeOut = coordActionStatus.get(CoordinatorAction.Status.TIMEDOUT); - } - int totalValuesSkipped = 0; - if (coordActionStatus.containsKey(CoordinatorAction.Status.SKIPPED)) { - totalValuesSkipped = coordActionStatus.get(CoordinatorAction.Status.SKIPPED); - } - - if (coordActionsCount == - (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesTimeOut + totalValuesSkipped)) { - // If all coord action is done and coord is killed, then don't change the status. - if (coordStatus[0].equals(Job.Status.KILLED)) { - coordStatus[0] = Job.Status.KILLED; - return true; - } - // If all the coordinator actions are succeeded then coordinator job should be succeeded. - if (coordActionsCount == (totalValuesSucceed + totalValuesSkipped) && isDoneMaterialization) { - coordStatus[0] = Job.Status.SUCCEEDED; - ret = true; - } - else if (coordActionsCount == totalValuesKilled) { - // If all the coordinator actions are KILLED then coordinator job should be KILLED. - coordStatus[0] = Job.Status.KILLED; - ret = true; - } - else if (coordActionsCount == totalValuesFailed) { - // If all the coordinator actions are FAILED then coordinator job should be FAILED. - coordStatus[0] = Job.Status.FAILED; - ret = true; - } - else { - coordStatus[0] = Job.Status.DONEWITHERROR; - ret = true; } } - return ret; - } - - private boolean checkPrepStatus(HashMap<Job.Status, Integer> bundleActionStatus, - List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) { - boolean ret = false; - if (bundleActionStatus.containsKey(Job.Status.PREP)) { - // If all the bundle actions are PREP then bundle job should be RUNNING. - if (bundleActions.size() > bundleActionStatus.get(Job.Status.PREP)) { - bundleStatus[0] = getRunningStatus(bundleActionStatus); - ret = true; - } - } - return ret; - } - - private boolean checkPausedStatus(HashMap<Job.Status, Integer> bundleActionStatus, - List<BundleActionBean> bundleActions, Job.Status[] bundleJobStatus) { - boolean ret = false; - - // TODO - When bottom up cmds are allowed to change the status of parent job, - // if none of the bundle actions are in paused or pausedwitherror, the function should return - // false - - // top down - // If the bundle job is PAUSED or PAUSEDINERROR and no children are in error - // state, then job should be PAUSED otherwise it should be pausedwitherror - if (bundleJobStatus[0] == Job.Status.PAUSED || bundleJobStatus[0] == Job.Status.PAUSEDWITHERROR) { - if (bundleActionStatus.containsKey(Job.Status.KILLED) - || bundleActionStatus.containsKey(Job.Status.FAILED) - || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) - || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR) - || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR) - || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) { - bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR; - } - else { - bundleJobStatus[0] = Job.Status.PAUSED; - } - ret = true; - } - - // bottom up; check the status of parent through their children - else if (bundleActionStatus.containsKey(Job.Status.PAUSED) - && (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED))) { - bundleJobStatus[0] = Job.Status.PAUSED; - ret = true; - } - else if (bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) { - int pausedActions = bundleActionStatus.containsKey(Job.Status.PAUSED) ? bundleActionStatus - .get(Job.Status.PAUSED) : 0; - if (bundleActions.size() == pausedActions + bundleActionStatus.get(Job.Status.PAUSEDWITHERROR)) { - bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR; - ret = true; - } - } - else { - ret = false; - } - return ret; - } - - - private boolean checkSuspendStatus(HashMap<Job.Status, Integer> bundleActionStatus, - List<BundleActionBean> bundleActions, Job.Status[] bundleStatus, boolean isPending) { - boolean ret = false; - - // TODO - When bottom up cmds are allowed to change the status of parent job, - // if none of the bundle actions are in suspended or suspendedwitherror, the function should return - // false - - // top down - // if job is suspended - if (bundleStatus[0] == Job.Status.SUSPENDED - || bundleStatus[0] == Job.Status.SUSPENDEDWITHERROR) { - if (bundleActionStatus.containsKey(Job.Status.KILLED) - || bundleActionStatus.containsKey(Job.Status.FAILED) - || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) - || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR) - || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) { - bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR; - } - else { - bundleStatus[0] = Job.Status.SUSPENDED; - } - ret =true; - } - - // bottom up - // Update status of parent from the status of its children - else if (!isPending && bundleActionStatus.containsKey(Job.Status.SUSPENDED) - || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)) { - int succeededActions = bundleActionStatus.containsKey(Job.Status.SUCCEEDED) ? bundleActionStatus - .get(Job.Status.SUCCEEDED) : 0; - int killedActions = bundleActionStatus.containsKey(Job.Status.KILLED) ? bundleActionStatus - .get(Job.Status.KILLED) : 0; - int failedActions = bundleActionStatus.containsKey(Job.Status.FAILED) ? bundleActionStatus - .get(Job.Status.FAILED) : 0; - int doneWithErrorActions = bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) ? bundleActionStatus - .get(Job.Status.DONEWITHERROR) : 0; - - if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions) { - bundleStatus[0] = Job.Status.SUSPENDED; - ret = true; - } - else if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDEDWITHERROR) - + bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions + killedActions + failedActions + doneWithErrorActions) { - bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR; - ret = true; - } - } - return ret; - - } - - private boolean checkCoordPausedStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus, - int coordActionsCount, Job.Status[] coordStatus){ - boolean ret = false; - if (coordStatus[0].equals(Job.Status.PAUSED) || coordStatus[0].equals(Job.Status.PAUSEDWITHERROR)) { - if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) - || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) - || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) { - coordStatus[0] = Job.Status.PAUSEDWITHERROR; - } - else { - coordStatus[0] = Job.Status.PAUSED; - } - ret = true; - } - return ret; - } - private boolean checkCoordSuspendStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus, - int coordActionsCount, Job.Status[] coordStatus, boolean isDoneMaterialization, boolean isPending) { - boolean ret = false; - - // TODO - When bottom up cmds are allowed to change the status of parent job - //if none of the coord actions are in suspended or suspendedwitherror and materialization done is false - //,then the function should return - // false - - // top down - // check for children only when parent is suspended - if (coordStatus[0] == Job.Status.SUSPENDED || coordStatus[0] == Job.Status.SUSPENDEDWITHERROR) { - - if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) - || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) - || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) { - coordStatus[0] = Job.Status.SUSPENDEDWITHERROR; - } - else { - coordStatus[0] = Job.Status.SUSPENDED; - } - ret = true; - } - // bottom up - // look for children to check the parent's status only if materialization is - // done and all actions are non-pending - else if (isDoneMaterialization && !isPending && coordActionStatus.containsKey(CoordinatorAction.Status.SUSPENDED)) { - int succeededActions = coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED) ? coordActionStatus - .get(CoordinatorAction.Status.SUCCEEDED) : 0; - int killedActions = coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) ? coordActionStatus - .get(CoordinatorAction.Status.KILLED) : 0; - int failedActions = coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) ? coordActionStatus - .get(CoordinatorAction.Status.FAILED) : 0; - int timedoutActions = coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT) ? coordActionStatus - .get(CoordinatorAction.Status.TIMEDOUT) : 0; - - if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED) + succeededActions) { - coordStatus[0] = Job.Status.SUSPENDED; - ret = true; - } - else if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED) - + succeededActions + killedActions + failedActions + timedoutActions) { - coordStatus[0] = Job.Status.SUSPENDEDWITHERROR; - ret = true; - } - } - return ret; - } - - private boolean checkCoordRunningStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus, - int coordActionsCount, Job.Status[] coordStatus) { - boolean ret = false; - if (coordStatus[0] != Job.Status.PREP) { - if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) - || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) - || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) { - coordStatus[0] = Job.Status.RUNNINGWITHERROR; - } - else { - coordStatus[0] = Job.Status.RUNNING; - } - ret = true; - } - return ret; - } - - private boolean checkRunningStatus(HashMap<Job.Status, Integer> bundleActionStatus, - List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) { - boolean ret = false; - if (bundleStatus[0] != Job.Status.PREP) { - bundleStatus[0] = getRunningStatus(bundleActionStatus); - ret = true; - } - return ret; - - } - - private Job.Status getRunningStatus(HashMap<Job.Status, Integer> bundleActionStatus) { - if (bundleActionStatus.containsKey(Job.Status.FAILED) - || bundleActionStatus.containsKey(Job.Status.KILLED) - || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) - || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)) { - return Job.Status.RUNNINGWITHERROR; - } - else { - return Job.Status.RUNNING; - } - } - - private void updateBundleJob(boolean isPending, BundleJobBean bundleJob, Job.Status bundleStatus) - throws JPAExecutorException { - String jobId = bundleJob.getId(); - // Update the Bundle Job - // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and - // PAUSEDWITHERROR is not supported - bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(bundleStatus)); - if (isPending) { - bundleJob.setPending(); - LOG.info("Bundle job [" + jobId + "] Pending set to TRUE"); - } - else { - bundleJob.resetPending(); - LOG.info("Bundle job [" + jobId + "] Pending set to FALSE"); - } - BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING_MODTIME, - bundleJob); - } - - private void updateCoordJob(boolean isPending, CoordinatorJobBean coordJob, Job.Status coordStatus) - throws JPAExecutorException, CommandException { - Job.Status prevStatus = coordJob.getStatus(); - // Update the Coord Job - if (coordJob.getStatus() == Job.Status.SUCCEEDED || coordJob.getStatus() == Job.Status.FAILED - || coordJob.getStatus() == Job.Status.KILLED || coordJob.getStatus() == Job.Status.DONEWITHERROR) { - if (coordStatus == Job.Status.SUSPENDED || coordStatus == Job.Status.SUSPENDEDWITHERROR) { - LOG.info("Coord Job [" + coordJob.getId() - + "] status to "+ coordStatus +" can not be updated as its already in Terminal state"); - return; - } - } - - boolean isPendingStateChanged = checkCoordPending(isPending, coordJob, false); - // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and PAUSEDWITHERROR is - // not supported - coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(coordStatus)); - // Backward support when coordinator namespace is 0.1 - coordJob.setStatus(StatusUtils.getStatus(coordJob)); - if (coordJob.getStatus() != prevStatus || isPendingStateChanged) { - LOG.info("Set coordinator job [" + coordJob.getId() + "] status to '" + coordJob.getStatus() + "' from '" - + prevStatus + "'"); - coordJob.setLastModifiedTime(new Date()); - CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_MODTIME, coordJob); - } - // update bundle action only when status changes in coord job - if (coordJob.getBundleId() != null) { - if (!prevStatus.equals(coordJob.getStatus())) { - BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); - bundleStatusUpdate.call(); - } - } - } - - private boolean checkCoordPending(boolean isPending, CoordinatorJobBean coordJob, boolean saveToDB) - throws JPAExecutorException { - // Checking the coordinator pending should be updated or not - boolean prevPending = coordJob.isPending(); - if (isPending) { - coordJob.setPending(); - } - else { - coordJob.resetPending(); - } - boolean hasChange = prevPending != coordJob.isPending(); - if (saveToDB && hasChange) { - LOG.info("Change coordinator job [" + coordJob.getId() + "] pending to '" + coordJob.isPending() - + "' from '" + prevPending + "'"); - CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_MODTIME, coordJob); - } - return hasChange; - } /** @@ -719,6 +165,7 @@ public class StatusTransitService implements Service { */ private void coordTransit() throws JPAExecutorException, CommandException { List<CoordinatorJobBean> pendingJobCheckList = null; + final Set<String> coordIds = new HashSet<String>(); if (lastInstanceStartTime == null) { LOG.info("Running coordinator status service first instance"); // this is the first instance, we need to check for all pending jobs; @@ -727,44 +174,36 @@ public class StatusTransitService implements Service { else { LOG.info("Running coordinator status service from last instance time = " + DateUtils.formatDateOozieTZ(lastInstanceStartTime)); - // this is not the first instance, we should only check jobs - // that have actions or jobs been - // updated >= start time of last service run; - List<CoordinatorActionBean> actionsList = CoordActionQueryExecutor.getInstance().getList( - CoordActionQuery.GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME, lastInstanceStartTime); - Set<String> coordIds = new HashSet<String>(); - for (CoordinatorActionBean action : actionsList) { - coordIds.add(action.getJobId()); + // this is not the first instance, we should only check jobs. + // that have actions or jobs been updated >= start time of last service run; + pendingJobCheckList = CoordJobQueryExecutor.getInstance().getList( + CoordJobQuery.GET_COORD_IDS_FOR_STATUS_TRANSIT, lastInstanceStartTime); + + pendingJobCheckList.addAll(CoordJobQueryExecutor.getInstance().getList( + CoordJobQuery.GET_COORD_JOBS_CHANGED, lastInstanceStartTime)); + } + for (final CoordinatorJobBean job : pendingJobCheckList) { + coordIds.add(job.getId()); + } + coordIds.addAll(coordFailedIds); + coordFailedIds.clear(); + for (final String coordId : coordIds) { + try { + new CoordStatusTransitXCommand(coordId).call(); } + catch (CommandException e) { + // Unable to acquire lock. Will try next time + if (e.getErrorCode() == ErrorCode.E0606) { + coordFailedIds.add(coordId); + LOG.info("Unable to acquire lock for " + coordId + ". Will try next time"); - pendingJobCheckList = new ArrayList<CoordinatorJobBean>(); - for (String coordId : coordIds.toArray(new String[coordIds.size()])) { - CoordinatorJobBean coordJob; - try { - coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, coordId); - } - catch (JPAExecutorException jpaee) { - if (jpaee.getErrorCode().equals(ErrorCode.E0604)) { - LOG.warn("Exception happened during StatusTransitRunnable; Coordinator Job doesn't exist", jpaee); - continue; - } else { - throw jpaee; - } } - // Running coord job might have pending false - Job.Status coordJobStatus = coordJob.getStatus(); - if ((coordJob.isPending() || coordJobStatus.equals(Job.Status.PAUSED) - || coordJobStatus.equals(Job.Status.RUNNING) - || coordJobStatus.equals(Job.Status.RUNNINGWITHERROR) - || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) - && !coordJobStatus.equals(Job.Status.IGNORED)) { - pendingJobCheckList.add(coordJob); + else { + LOG.error("Error running CoordStatusTransitXCommand for job " + coordId, e); } + } - pendingJobCheckList.addAll(CoordJobQueryExecutor.getInstance().getList( - CoordJobQuery.GET_COORD_JOBS_CHANGED, lastInstanceStartTime)); } - aggregateCoordJobsStatus(pendingJobCheckList); } } @@ -775,7 +214,7 @@ public class StatusTransitService implements Service { */ @Override public void init(Services services) { - Configuration conf = services.getConf(); + final Configuration conf = services.getConf(); Runnable stateTransitRunnable = new StatusTransitRunnable(); services.get(SchedulerService.class).schedule(stateTransitRunnable, 10, conf.getInt(CONF_STATUSTRANSIT_INTERVAL, 60), SchedulerService.Unit.SEC); @@ -797,5 +236,5 @@ public class StatusTransitService implements Service { public Class<? extends Service> getInterface() { return StatusTransitService.class; } -} +}
http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleActionQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleActionQueryExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleActionQueryExecutor.java index 1fbc3d8..11a1610 100644 --- a/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleActionQueryExecutor.java +++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleActionQueryExecutor.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.oozie.executor.jpa; import java.sql.Timestamp; @@ -97,10 +96,6 @@ public class TestBundleActionQueryExecutor extends XDataTestCase { assertEquals(query.getParameterValue("bundleActionId"), bean.getBundleId()); query = BundleActionQueryExecutor.getInstance().getSelectQuery( - BundleActionQuery.GET_BUNDLE_ACTIONS_BY_LAST_MODIFIED_TIME, em, bean.getLastModifiedTime()); - assertEquals(query.getParameterValue("lastModifiedTime"), bean.getLastModifiedTimestamp()); - - query = BundleActionQueryExecutor.getInstance().getSelectQuery( BundleActionQuery.GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN, em, (long) 100); Date date = DateUtils.toDate((Timestamp) (query.getParameterValue("lastModifiedTime"))); assertTrue(date.before(Calendar.getInstance().getTime())); @@ -138,20 +133,10 @@ public class TestBundleActionQueryExecutor extends XDataTestCase { public void testGetList() throws Exception { BundleJobBean job = this.addRecordToBundleJobTable(Job.Status.RUNNING, false); - BundleActionBean bean1 = this.addRecordToBundleActionTable(job.getId(), "coord1", 0, Job.Status.PREP); - BundleActionBean bean2 = this.addRecordToBundleActionTable(job.getId(), "coord2", 1, Job.Status.RUNNING); - BundleActionBean bean3 = this.addRecordToBundleActionTable(job.getId(), "coord3", 1, Job.Status.RUNNING); - // GET_BUNDLE_ACTIONS_BY_LAST_MODIFIED_TIME - Date timeBefore = new Date(bean1.getLastModifiedTime().getTime() - 1000 * 60); + this.addRecordToBundleActionTable(job.getId(), "coord1", 0, Job.Status.PREP); + this.addRecordToBundleActionTable(job.getId(), "coord2", 1, Job.Status.RUNNING); + this.addRecordToBundleActionTable(job.getId(), "coord3", 1, Job.Status.RUNNING); List<BundleActionBean> bActions = BundleActionQueryExecutor.getInstance().getList( - BundleActionQuery.GET_BUNDLE_ACTIONS_BY_LAST_MODIFIED_TIME, timeBefore); - assertEquals(3, bActions.size()); - Date timeAfter = new Date(bean3.getLastModifiedTime().getTime() + 100 * 60); - bActions = BundleActionQueryExecutor.getInstance().getList( - BundleActionQuery.GET_BUNDLE_ACTIONS_BY_LAST_MODIFIED_TIME, timeAfter); - assertEquals(0, bActions.size()); - // GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN - bActions = BundleActionQueryExecutor.getInstance().getList( BundleActionQuery.GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN, (long) (1000 * 60)); assertEquals(0, bActions.size()); bActions = BundleActionQueryExecutor.getInstance().getList( @@ -178,4 +163,4 @@ public class TestBundleActionQueryExecutor extends XDataTestCase { assertEquals(retBean.getCoordName(), "testApp"); assertEquals(retBean.getStatus(), Job.Status.RUNNING); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobQueryExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobQueryExecutor.java index 0326b47..509eedd 100644 --- a/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobQueryExecutor.java +++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobQueryExecutor.java @@ -15,12 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.oozie.executor.jpa; +import java.util.Date; +import java.util.List; + import javax.persistence.EntityManager; import javax.persistence.Query; +import org.apache.oozie.BundleActionBean; import org.apache.oozie.BundleJobBean; import org.apache.oozie.client.Job; import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery; @@ -141,6 +144,24 @@ public class TestBundleJobQueryExecutor extends XDataTestCase { assertEquals(bean.getId(), retBean.getId()); } + public void testBundleIDsForStatusTransit() throws Exception { + BundleJobBean job1 = this.addRecordToBundleJobTable(Job.Status.RUNNING, false); + BundleJobBean job2 = this.addRecordToBundleJobTable(Job.Status.RUNNING, false); + BundleJobBean job3 = this.addRecordToBundleJobTable(Job.Status.RUNNING, false); + BundleActionBean bean1 = this.addRecordToBundleActionTable(job1.getId(), "coord1", 0, Job.Status.PREP); + BundleActionBean bean2 = this.addRecordToBundleActionTable(job2.getId(), "coord2", 1, Job.Status.RUNNING); + BundleActionBean bean3 = this.addRecordToBundleActionTable(job3.getId(), "coord3", 1, Job.Status.RUNNING); + // GET_BUNDLE_ACTIONS_BY_LAST_MODIFIED_TIME + Date timeBefore = new Date(bean1.getLastModifiedTime().getTime() - 1000 * 60); + List<BundleJobBean> jobBean = BundleJobQueryExecutor.getInstance().getList( + BundleJobQuery.GET_BUNDLE_IDS_FOR_STATUS_TRANSIT, timeBefore); + assertEquals(3, jobBean.size()); + Date timeAfter = new Date(bean3.getLastModifiedTime().getTime() + 100 * 60); + jobBean = BundleJobQueryExecutor.getInstance().getList(BundleJobQuery.GET_BUNDLE_IDS_FOR_STATUS_TRANSIT, + timeAfter); + assertEquals(0, jobBean.size()); + } + public void testGetList() throws Exception { // TODO } @@ -155,4 +176,4 @@ public class TestBundleJobQueryExecutor extends XDataTestCase { assertEquals(retBean.getAppName(), "testApp"); assertEquals(retBean.getUser(), "oozie"); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetPendingActionsCountJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetPendingActionsCountJPAExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetPendingActionsCountJPAExecutor.java deleted file mode 100644 index 5b62fdf..0000000 --- a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetPendingActionsCountJPAExecutor.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.oozie.executor.jpa; - -import org.apache.oozie.CoordinatorJobBean; -import org.apache.oozie.client.CoordinatorAction; -import org.apache.oozie.client.CoordinatorJob; -import org.apache.oozie.local.LocalOozie; -import org.apache.oozie.service.JPAService; -import org.apache.oozie.service.Services; -import org.apache.oozie.test.XDataTestCase; - -public class TestCoordJobGetPendingActionsCountJPAExecutor extends XDataTestCase { - Services services; - - @Override - protected void setUp() throws Exception { - super.setUp(); - services = new Services(); - services.init(); - } - - @Override - protected void tearDown() throws Exception { - services.destroy(); - super.tearDown(); - } - - public void testCoordJobGetPendingActionsCount() throws Exception { - int actionNum = 1; - CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false); - String jobId = job.getId(); - // Insert 2 coordinator actions with pending true and 1 coordinator action with pending false - addRecordToCoordActionTable(jobId, actionNum++, CoordinatorAction.Status.FAILED, "coord-action-get.xml", 1); - addRecordToCoordActionTable(jobId, actionNum++, CoordinatorAction.Status.KILLED, "coord-action-get.xml", 1); - addRecordToCoordActionTable(jobId, actionNum++, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0); - - _testCoordActionsPendingCount(jobId, 2); - } - - private void _testCoordActionsPendingCount(String jobId, int expectedSize) throws Exception { - JPAService jpaService = Services.get().get(JPAService.class); - assertNotNull(jpaService); - // Call JPAExecutor to get actions which are pending - CoordJobGetPendingActionsCountJPAExecutor actionGetCmd = new CoordJobGetPendingActionsCountJPAExecutor(jobId); - int pendingCount = jpaService.execute(actionGetCmd); - // As two actions are pending, expected count is 2 - assertEquals(pendingCount, expectedSize); - - } - -} http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java b/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java index 1f9e76a..364d71d 100644 --- a/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java +++ b/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.oozie.service; import java.util.Date; @@ -55,6 +54,7 @@ import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor; +import org.apache.oozie.lock.LockToken; import org.apache.oozie.service.StatusTransitService.StatusTransitRunnable; import org.apache.oozie.test.XDataTestCase; import org.apache.oozie.util.DateUtils; @@ -824,7 +824,8 @@ public class TestStatusTransitService extends XDataTestCase { assertNotNull(jpaService); final String bundleId = job.getId(); - addRecordToBundleActionTable(bundleId, "action1", 0, Job.Status.KILLED); + CoordinatorJobBean coord1 = addRecordToCoordJobTable(CoordinatorJob.Status.KILLED, false, false); + addRecordToBundleActionTable(bundleId, coord1.getId(), "action1", 0, Job.Status.KILLED); String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); @@ -1288,7 +1289,10 @@ public class TestStatusTransitService extends XDataTestCase { final String bundleId = bundleJob.getId(); addRecordToBundleActionTable(bundleId, "action1", 1, Job.Status.PAUSED); addRecordToBundleActionTable(bundleId, "action2", 1, Job.Status.PAUSED); - addRecordToBundleActionTable(bundleId, "action3", 0, Job.Status.FAILED); + BundleActionBean bundleAction = addRecordToBundleActionTable(bundleId, "action3", 0, Job.Status.FAILED); + bundleAction.setCoordId("test"); + BundleActionQueryExecutor.getInstance().executeUpdate( + BundleActionQuery.UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME_COORDID, bundleAction); Runnable runnable = new StatusTransitRunnable(); runnable.run(); @@ -1443,13 +1447,9 @@ public class TestStatusTransitService extends XDataTestCase { BundleJobBean bundleJob = this.addRecordToBundleJobTable(Job.Status.RUNNING, true); final JPAService jpaService = Services.get().get(JPAService.class); final String bundleId = bundleJob.getId(); - BundleActionBean bundleAction1 = addRecordToBundleActionTable(bundleId, "action1-C", 1, Job.Status.RUNNING); - String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); - Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); - Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); - - addRecordToCoordJobTableWithBundle(bundleId, "action1-C", CoordinatorJob.Status.RUNNING, start, end, false, - true, 1); + CoordinatorJobBean coord1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false); + BundleActionBean bundleAction1 = addRecordToBundleActionTable(bundleId, coord1.getId(), "action1-C", 1, + Job.Status.RUNNING); bundleJob.setPending(); bundleAction1.setStatus(Job.Status.KILLED); bundleAction1.setPending(0); @@ -1467,7 +1467,9 @@ public class TestStatusTransitService extends XDataTestCase { bundleAction1.setPending(1); bundleAction1.setStatus(Job.Status.RUNNING); + bundleAction1.setLastModifiedTime(new Date()); bundleJob.setPending(); + bundleJob.setLastModifiedTime(new Date()); BundleActionQueryExecutor.getInstance().executeUpdate( BundleActionQuery.UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME_COORDID, bundleAction1); BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING, bundleJob); @@ -1479,15 +1481,20 @@ public class TestStatusTransitService extends XDataTestCase { // Test bundle transition from running to runningwitherror when one action is killed. public void testBundleStatusTransitRunningWithError() throws Exception { - Services.get().destroy(); setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, "false"); - new Services().init(); + services = new Services(); + services.init(); + + final CoordinatorJobBean coord1 = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, false, false); + final CoordinatorJobBean coord2 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false); + final CoordinatorJobBean coord3 = addRecordToCoordJobTable(CoordinatorJob.Status.DONEWITHERROR, false, false); BundleJobBean bundleJob = this.addRecordToBundleJobTable(Job.Status.RUNNING, true); final String bundleId = bundleJob.getId(); - addRecordToBundleActionTable(bundleId, "action1-C", 0, Job.Status.PREP); - addRecordToBundleActionTable(bundleId, "action2-C", 0, Job.Status.RUNNING); - BundleActionBean action3 = addRecordToBundleActionTable(bundleId, "action3-C", 0, Job.Status.DONEWITHERROR); + addRecordToBundleActionTable(bundleId, coord1.getId(), "action1-C", 0, Job.Status.PREP); + addRecordToBundleActionTable(bundleId, coord2.getId(), "action2-C", 0, Job.Status.RUNNING); + BundleActionBean action3 = addRecordToBundleActionTable(bundleId, coord3.getId(), "action3-C", 0, + Job.Status.DONEWITHERROR); Runnable runnable = new StatusTransitRunnable(); runnable.run(); @@ -1505,4 +1512,122 @@ public class TestStatusTransitService extends XDataTestCase { assertEquals(Job.Status.RUNNING, bundleJob.getStatus()); } + + public void testBundleStatusTransitWithLock() throws Exception { + setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, "false"); + services = new Services(); + services.init(); + + BundleJobBean bundleJob = this.addRecordToBundleJobTable(Job.Status.RUNNING, true); + + final String jobId = bundleJob.getId(); + final String bundleId = bundleJob.getId(); + addRecordToBundleActionTable(bundleId, "action1-C", 0, Job.Status.PREP); + addRecordToBundleActionTable(bundleId, "action2-C", 0, Job.Status.RUNNING); + addRecordToBundleActionTable(bundleId, "action3-C", 0, Job.Status.DONEWITHERROR); + + JobLock lockThread = new JobLock(jobId); + new Thread(lockThread).start(); + + sleep(1000); + Runnable runnable = new StatusTransitRunnable(); + runnable.run(); + bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB_STATUS, bundleId); + assertEquals(Job.Status.RUNNING, bundleJob.getStatus()); + synchronized (lockThread) { + lockThread.notifyAll(); + } + sleep(1000); + runnable.run(); + bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB_STATUS, bundleId); + assertEquals(Job.Status.RUNNINGWITHERROR, bundleJob.getStatus()); + + } + + public void testCoordStatusTransitWithLock() throws Exception { + setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, "false"); + services = new Services(); + services.init(); + + final JPAService jpaService = Services.get().get(JPAService.class); + String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); + Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); + Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); + + CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, false, + 1); + addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.KILLED, "coord-action-get.xml", null, + "KILLED", 0); + + final CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(coordJob.getId()); + JobLock lockThread = new JobLock(coordJob.getId()); + new Thread(lockThread).start(); + Runnable runnable = new StatusTransitRunnable(); + runnable.run(); + sleep(1000); + coordJob = jpaService.execute(coordJobGetCmd); + assertEquals(CoordinatorJob.Status.RUNNING, coordJob.getStatus()); + + synchronized (lockThread) { + lockThread.notifyAll(); + } + runnable.run(); + coordJob = jpaService.execute(coordJobGetCmd); + assertEquals(CoordinatorJob.Status.RUNNINGWITHERROR, coordJob.getStatus()); + + } + + public void testBundleStatusCoordSubmitFails() throws Exception { + setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, "false"); + services = new Services(); + services.init(); + BundleJobBean bundleJob = this.addRecordToBundleJobTable(Job.Status.RUNNING, false); + + final String bundleId = bundleJob.getId(); + addRecordToBundleActionTable(bundleId, null, 0, Job.Status.FAILED); + Runnable runnable = new StatusTransitRunnable(); + runnable.run(); + // First try will kill the job. + bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB_STATUS, bundleId); + assertEquals(Job.Status.FAILED, bundleJob.getStatus()); + sleep(1000); + bundleJob.setStatus(Job.Status.RUNNING); + BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING, bundleJob); + runnable.run(); + // second try will change the status. + bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB_STATUS, bundleId); + assertEquals(Job.Status.FAILED, bundleJob.getStatus()); + + } + + static class JobLock implements Runnable { + String jobId; + + public JobLock(String jobId) { + this.jobId = jobId; + } + + LockToken lock = null; + + public void acquireLock() throws InterruptedException { + lock = Services.get().get(MemoryLocksService.class).getWriteLock(jobId, 0); + } + + public void release() { + lock.release(); + } + + @Override + public void run() { + try { + acquireLock(); + synchronized (this) { + this.wait(); + } + release(); + } + catch (InterruptedException e) { + } + } + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index ba512c5..f4a866d 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.2.0 release (trunk - unreleased) +OOZIE-1940 StatusTransitService has race condition (puru) OOZIE-1696 Document how to get the action conf in the Java action (jrkinley via rkanter) OOZIE-1567 Provide a wait tool in Oozie (rkanter) OOZIE-2014 TestAuthFilterAuthOozieClient fails after OOZIE-1917 (rkanter)
