http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/service/StatusTransitService.java 
b/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
index 0d549a0..77dcda9 100644
--- a/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
+++ b/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
@@ -18,42 +18,27 @@
 
 package org.apache.oozie.service;
 
-import java.util.ArrayList;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.TreeSet;
-import java.util.Comparator;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.oozie.BundleActionBean;
 import org.apache.oozie.BundleJobBean;
-import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.Job;
 import org.apache.oozie.command.CommandException;
-import org.apache.oozie.command.bundle.BundleKillXCommand;
-import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
+import org.apache.oozie.command.bundle.BundleStatusTransitXCommand;
+import org.apache.oozie.command.coord.CoordStatusTransitXCommand;
 import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
-import 
org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
 import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
-import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
 import org.apache.oozie.executor.jpa.BundleJobsGetRunningOrPendingJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
-import org.apache.oozie.executor.jpa.CoordJobGetActionsStatusJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobGetPendingActionsCountJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
 import org.apache.oozie.executor.jpa.CoordJobsGetPendingJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.lock.LockToken;
-import org.apache.oozie.util.StatusUtils;
 import org.apache.oozie.util.XLog;
 
 /**
@@ -64,13 +49,15 @@ import org.apache.oozie.util.XLog;
  * SUCCEEDED.
  */
 public class StatusTransitService implements Service {
-    public static final String CONF_PREFIX = Service.CONF_PREFIX + 
"StatusTransitService.";
-    public static final String CONF_STATUSTRANSIT_INTERVAL = CONF_PREFIX + 
"statusTransit.interval";
-    public static final String CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS = 
CONF_PREFIX + "backward.support.for.coord.status";
-    public static final String CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR 
= CONF_PREFIX + "backward.support.for.states.without.error";
-    private static int limit = -1;
-    private static Date lastInstanceStartTime = null;
-    private final static XLog LOG = XLog.getLog(StatusTransitRunnable.class);
+    private static final String CONF_PREFIX = Service.CONF_PREFIX + 
"StatusTransitService.";
+    private static final String CONF_STATUSTRANSIT_INTERVAL = CONF_PREFIX + 
"statusTransit.interval";
+    public static final String CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS = 
CONF_PREFIX
+            + "backward.support.for.coord.status";
+    public static final String CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR 
= CONF_PREFIX
+            + "backward.support.for.states.without.error";
+    public static int limit = -1;
+    public static Date lastInstanceStartTime = null;
+    public final static XLog LOG = XLog.getLog(StatusTransitRunnable.class);
 
     /**
      * StateTransitRunnable is the runnable which is scheduled to run at the 
configured interval.
@@ -83,6 +70,9 @@ public class StatusTransitService implements Service {
         private JPAService jpaService = null;
         private LockToken lock;
 
+        private Set<String> coordFailedIds = new HashSet<String>();
+        private Set<String> bundleFailedIds = new HashSet<String>();
+
         public StatusTransitRunnable() {
             jpaService = Services.get().get(JPAService.class);
             if (jpaService == null) {
@@ -93,22 +83,19 @@ public class StatusTransitService implements Service {
         @Override
         public void run() {
             try {
-                Date curDate = new Date(); // records the start time of this 
service run;
+                final Date curDate = new Date(); // records the start time of 
this service run;
 
                 // first check if there is some other instance running;
-                lock = 
Services.get().get(MemoryLocksService.class).getWriteLock(StatusTransitService.class.getName(),
-                        lockTimeout);
+                lock = Services.get().get(MemoryLocksService.class)
+                        .getWriteLock(StatusTransitService.class.getName(), 
lockTimeout);
                 if (lock == null) {
                     LOG.info("This StatusTransitService instance"
                             + " will not run since there is already an 
instance running");
                 }
                 else {
                     LOG.info("Acquired lock for [{0}]", 
StatusTransitService.class.getName());
-                    // running coord jobs transit service
                     coordTransit();
-                    // running bundle jobs transit service
                     bundleTransit();
-
                     lastInstanceStartTime = curDate;
                 }
             }
@@ -116,7 +103,6 @@ public class StatusTransitService implements Service {
                 LOG.warn("Exception happened during StatusTransitRunnable ", 
ex);
             }
             finally {
-                // release lock;
                 if (lock != null) {
                     lock.release();
                     LOG.info("Released lock for [{0}]", 
StatusTransitService.class.getName());
@@ -124,22 +110,6 @@ public class StatusTransitService implements Service {
             }
         }
 
-        public List<BundleJobBean> removeDuplicates(List<BundleJobBean> 
pendingJobList) {
-            Set<BundleJobBean> s = new TreeSet<BundleJobBean>(new 
Comparator<BundleJobBean>() {
-                @Override
-                public int compare(BundleJobBean b1, BundleJobBean b2) {
-                    if (b1.getId().equals(b2.getId())) {
-                        return 0;
-                    }
-                    else {
-                        return 1;
-                    }
-                }
-            });
-            s.addAll(pendingJobList);
-            return new ArrayList<BundleJobBean>(s);
-        }
-
         /**
          * Aggregate bundle actions' status to bundle jobs
          *
@@ -147,11 +117,13 @@ public class StatusTransitService implements Service {
          * @throws CommandException thrown if failed to run commands
          */
         private void bundleTransit() throws JPAExecutorException, 
CommandException {
-            List<BundleJobBean> pendingJobCheckList = null;
+            List<BundleJobBean> pendingJobCheckList;
+            final Set<String> bundleIds = new HashSet<String>();
 
             if (lastInstanceStartTime == null) {
                 LOG.info("Running bundle status service first instance");
-                // this is the first instance, we need to check for all 
pending or running jobs;
+                // This is the first instance, we need to check for all 
pending or running jobs;
+                // TODO currently limit is = -1. Need to do actual batching
                 pendingJobCheckList = jpaService.execute(new 
BundleJobsGetRunningOrPendingJPAExecutor(limit));
             }
             else {
@@ -159,556 +131,30 @@ public class StatusTransitService implements Service {
                         + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
                 // this is not the first instance, we should only check jobs 
that have actions been
                 // updated >= start time of last service run;
-                List<BundleActionBean> actionList = 
BundleActionQueryExecutor.getInstance().getList(
-                        
BundleActionQuery.GET_BUNDLE_ACTIONS_BY_LAST_MODIFIED_TIME, 
lastInstanceStartTime);
-                Set<String> bundleIds = new HashSet<String>();
-                for (BundleActionBean action : actionList) {
-                    bundleIds.add(action.getBundleId());
-                }
-                pendingJobCheckList = new ArrayList<BundleJobBean>();
-                for (String bundleId : bundleIds.toArray(new 
String[bundleIds.size()])) {
-                    BundleJobBean bundle = 
BundleJobQueryExecutor.getInstance().get(
-                            
BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME, bundleId);
-                    // Running bundle job might have pending false
-                    if (bundle.isPending() || 
bundle.getStatus().equals(Job.Status.RUNNING)
-                            || 
bundle.getStatus().equals(Job.Status.RUNNINGWITHERROR)
-                            || bundle.getStatus().equals(Job.Status.PAUSED)
-                            || 
bundle.getStatus().equals(Job.Status.PAUSEDWITHERROR)) {
-                        pendingJobCheckList.add(bundle);
-                    }
-                }
-            }
-            aggregateBundleJobsStatus(pendingJobCheckList);
-        }
-
-        private void aggregateBundleJobsStatus(List<BundleJobBean> 
bundleLists) throws JPAExecutorException,
-                CommandException {
-            if (bundleLists != null) {
-                    for (BundleJobBean bundleJob : bundleLists) {
-                        try {
-                            String jobId = bundleJob.getId();
-                            Job.Status[] bundleStatus = new Job.Status[1];
-                            bundleStatus[0] = bundleJob.getStatus();
-                            List<BundleActionBean> bundleActions = 
BundleActionQueryExecutor.getInstance().getList(
-                                
BundleActionQuery.GET_BUNDLE_UNIGNORED_ACTION_STATUS_PENDING_FOR_BUNDLE, jobId);
-                            HashMap<Job.Status, Integer> bundleActionStatus = 
new HashMap<Job.Status, Integer>();
-                            boolean foundPending = false;
-                            for (BundleActionBean bAction : bundleActions) {
-                                    int counter = 0;
-                                    if 
(bundleActionStatus.containsKey(bAction.getStatus())) {
-                                        counter = 
bundleActionStatus.get(bAction.getStatus()) + 1;
-                                    }
-                                    else {
-                                        ++counter;
-                                    }
-                                    
bundleActionStatus.put(bAction.getStatus(), counter);
-                                    if (bAction.getCoordId() == null
-                                            && (bAction.getStatus() == 
Job.Status.FAILED || bAction.getStatus() == Job.Status.KILLED)) {
-                                        (new BundleKillXCommand(jobId)).call();
-                                        LOG.info("Bundle job ["+ jobId
-                                                        + "] has been killed 
since one of its coordinator job failed submission.");
-                                    }
-
-                                 if (bAction.isPending()) {
-                                    foundPending = true;
-                                }
-                            }
-
-                            if (!foundPending && 
checkTerminalStatus(bundleActionStatus, bundleActions, bundleStatus)) {
-                                LOG.info("Set bundle job [" + jobId + "] 
status to '" + bundleStatus[0].toString()
-                                        + "' from '" + bundleJob.getStatus() + 
"'");
-                                updateBundleJob(foundPending, bundleJob, 
bundleStatus[0]);
-                            }
-                            else if (!foundPending && 
checkPrepStatus(bundleActionStatus, bundleActions, bundleStatus)) {
-                                LOG.info("Set bundle job [" + jobId + "] 
status to '" + bundleStatus[0].toString()
-                                        + "' from '" + bundleJob.getStatus() + 
"'");
-                                updateBundleJob(foundPending, bundleJob, 
bundleStatus[0]);
-                            }
-                            else if (checkPausedStatus(bundleActionStatus, 
bundleActions, bundleStatus)) {
-                                LOG.info("Set bundle job [" + jobId + "] 
status to '" + bundleStatus[0].toString()
-                                        + "' from '" + bundleJob.getStatus() + 
"'");
-                                updateBundleJob(foundPending, bundleJob, 
bundleStatus[0]);
-                            }
-                            else if (checkSuspendStatus(bundleActionStatus, 
bundleActions, bundleStatus, foundPending)) {
-                                LOG.info("Set bundle job [" + jobId + "] 
status to '" + bundleStatus[0].toString()
-                                        + "' from '" + bundleJob.getStatus() + 
"'");
-                                updateBundleJob(foundPending, bundleJob, 
bundleStatus[0]);
-                            }
-                            else if (checkRunningStatus(bundleActionStatus, 
bundleActions, bundleStatus)) {
-                                LOG.info("Set bundle job [" + jobId + "] 
status to '" + bundleStatus[0].toString()
-                                        + "' from '" + bundleJob.getStatus() + 
"'");
-                                updateBundleJob(foundPending, bundleJob, 
bundleStatus[0]);
-                            }
-                        }
-                        catch (Exception ex) {
-                            LOG.error("Exception happened during aggregate 
bundle job's status, job = "
-                                    + bundleJob.getId(), ex);
-                        }
+                pendingJobCheckList = 
BundleJobQueryExecutor.getInstance().getList(
+                        BundleJobQuery.GET_BUNDLE_IDS_FOR_STATUS_TRANSIT, 
lastInstanceStartTime);
+            }
+            for (BundleJobBean job : pendingJobCheckList) {
+                bundleIds.add(job.getId());
+            }
+            bundleIds.addAll(bundleFailedIds);
+            bundleFailedIds.clear();
+            for (final String jobId : bundleIds) {
+                try {
+                    new BundleStatusTransitXCommand(jobId).call();
+                }
+                catch (CommandException e) {
+                    // Unable to acquire lock. Will try next time
+                    if (e.getErrorCode() == ErrorCode.E0606) {
+                        bundleFailedIds.add(jobId);
+                        LOG.info("Unable to acquire lock for " + jobId + ". 
Will try next time");
                     }
-                }
-
-        }
-
-        private void aggregateCoordJobsStatus(List<CoordinatorJobBean> 
CoordList) throws JPAExecutorException,
-                CommandException {
-            if (CoordList != null) {
-                Configuration conf = Services.get().getConf();
-                boolean backwardSupportForCoordStatus = 
conf.getBoolean(CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false);
-
-                for (CoordinatorJobBean coordJob : CoordList) {
-                    try {
-                        // if namespace 0.1 is used and backward support is 
true, then ignore this coord job
-                        if (backwardSupportForCoordStatus == true && 
coordJob.getAppNamespace() != null
-                                && 
coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
-                            continue;
-                        }
-                        String jobId = coordJob.getId();
-                        Job.Status[] coordStatus = new Job.Status[1];
-                        coordStatus[0] = coordJob.getStatus();
-                        //Get count of Coordinator actions with pending true
-                        boolean isPending = false;
-                        int count = jpaService.execute(new 
CoordJobGetPendingActionsCountJPAExecutor(jobId));
-                        if (count > 0) {
-                             isPending = true;
-                        }
-                        // Get status of Coordinator actions
-                        List<CoordinatorAction.Status> coordActionStatusList = 
jpaService
-                                .execute(new 
CoordJobGetActionsStatusJPAExecutor(jobId));
-                        HashMap<CoordinatorAction.Status, Integer> 
coordActionStatus = new HashMap<CoordinatorAction.Status, Integer>();
-
-                        for (CoordinatorAction.Status status : 
coordActionStatusList) {
-                            int counter = 0;
-                            if (coordActionStatus.containsKey(status)) {
-                                counter = coordActionStatus.get(status) + 1;
-                            }
-                            else {
-                                ++counter;
-                            }
-                            coordActionStatus.put(status, counter);
-                        }
-
-                        int nonPendingCoordActionsCount = 
coordActionStatusList.size();
-                        boolean isDoneMaterialization = 
coordJob.isDoneMaterialization();
-                        if ((isDoneMaterialization || coordStatus[0] == 
Job.Status.FAILED || coordStatus[0] == Job.Status.KILLED)
-                                && checkCoordTerminalStatus(coordActionStatus, 
nonPendingCoordActionsCount,
-                                        coordStatus, isDoneMaterialization)) {
-                            updateCoordJob(isPending, coordJob, 
coordStatus[0]);
-                        }
-                        else if (checkCoordPausedStatus(coordActionStatus, 
nonPendingCoordActionsCount, coordStatus)) {
-                            updateCoordJob(isPending, coordJob, 
coordStatus[0]);
-                        }
-                        else if(checkCoordSuspendStatus( coordActionStatus, 
nonPendingCoordActionsCount, coordStatus, coordJob.isDoneMaterialization(), 
isPending)) {
-                            updateCoordJob(isPending, coordJob, 
coordStatus[0]);
-                        }
-                        else if (checkCoordRunningStatus(coordActionStatus, 
nonPendingCoordActionsCount, coordStatus)) {
-                            updateCoordJob(isPending, coordJob, 
coordStatus[0]);
-                        }
-                        else {
-                            checkCoordPending(isPending, coordJob, true);
-                        }
-                    }
-                    catch (Exception ex) {
-                        LOG.error("Exception happened during aggregate 
coordinator job's status, job = "
-                                + coordJob.getId(), ex);
+                    else {
+                        LOG.error("Error running BundleStatusTransitXCommand 
for job " + jobId, e);
                     }
-                }
-
-            }
-        }
-
-        private boolean checkTerminalStatus(HashMap<Job.Status, Integer> 
bundleActionStatus,
-                List<BundleActionBean> bundleActions, Job.Status[] 
bundleStatus) {
-            boolean ret = false;
-            int totalValuesSucceed = 0;
-            if (bundleActionStatus.containsKey(Job.Status.SUCCEEDED)) {
-                totalValuesSucceed = 
bundleActionStatus.get(Job.Status.SUCCEEDED);
-            }
-            int totalValuesFailed = 0;
-            if (bundleActionStatus.containsKey(Job.Status.FAILED)) {
-                totalValuesFailed = bundleActionStatus.get(Job.Status.FAILED);
-            }
-            int totalValuesKilled = 0;
-            if (bundleActionStatus.containsKey(Job.Status.KILLED)) {
-                totalValuesKilled = bundleActionStatus.get(Job.Status.KILLED);
-            }
-
-            int totalValuesDoneWithError = 0;
-            if (bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)) {
-                totalValuesDoneWithError = 
bundleActionStatus.get(Job.Status.DONEWITHERROR);
-            }
-
-            if (bundleActions.size() == (totalValuesSucceed + 
totalValuesFailed + totalValuesKilled + totalValuesDoneWithError)) {
-                // If all bundle action is done and bundle is killed, then 
don't change the status.
-                if (bundleStatus[0].equals(Job.Status.KILLED)) {
-                    bundleStatus[0] = Job.Status.KILLED;
-                    return true;
-                }
-                // If all the bundle actions are succeeded then bundle job 
should be succeeded.
-                if (bundleActions.size() == totalValuesSucceed) {
-                    bundleStatus[0] = Job.Status.SUCCEEDED;
-                    ret = true;
-                }
-                else if (bundleActions.size() == totalValuesKilled) {
-                    // If all the bundle actions are KILLED then bundle job 
should be KILLED.
-                    bundleStatus[0] = Job.Status.KILLED;
-                    ret = true;
-                }
-                else if (bundleActions.size() == totalValuesFailed) {
-                    // If all the bundle actions are FAILED then bundle job 
should be FAILED.
-                    bundleStatus[0] = Job.Status.FAILED;
-                    ret = true;
-                }
-                else {
-                    bundleStatus[0] = Job.Status.DONEWITHERROR;
-                    ret = true;
-                }
-            }
-            return ret;
-        }
-
-        private boolean 
checkCoordTerminalStatus(HashMap<CoordinatorAction.Status, Integer> 
coordActionStatus,
-                int coordActionsCount, Job.Status[] coordStatus, boolean 
isDoneMaterialization) {
-            boolean ret = false;
-            int totalValuesSucceed = 0;
-            if 
(coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED)) {
-                totalValuesSucceed = 
coordActionStatus.get(CoordinatorAction.Status.SUCCEEDED);
-            }
-            int totalValuesFailed = 0;
-            if 
(coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)) {
-                totalValuesFailed = 
coordActionStatus.get(CoordinatorAction.Status.FAILED);
-            }
-            int totalValuesKilled = 0;
-            if 
(coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)) {
-                totalValuesKilled = 
coordActionStatus.get(CoordinatorAction.Status.KILLED);
-            }
-            int totalValuesTimeOut = 0;
-            if 
(coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
-                totalValuesTimeOut = 
coordActionStatus.get(CoordinatorAction.Status.TIMEDOUT);
-            }
-            int totalValuesSkipped = 0;
-            if 
(coordActionStatus.containsKey(CoordinatorAction.Status.SKIPPED)) {
-                totalValuesSkipped = 
coordActionStatus.get(CoordinatorAction.Status.SKIPPED);
-            }
-
-            if (coordActionsCount ==
-                    (totalValuesSucceed + totalValuesFailed + 
totalValuesKilled + totalValuesTimeOut + totalValuesSkipped)) {
 
-                // If all coord action is done and coord is killed, then don't 
change the status.
-                if (coordStatus[0].equals(Job.Status.KILLED)) {
-                    coordStatus[0] = Job.Status.KILLED;
-                    return true;
-                }
-                // If all the coordinator actions are succeeded then 
coordinator job should be succeeded.
-                if (coordActionsCount == (totalValuesSucceed + 
totalValuesSkipped) && isDoneMaterialization) {
-                    coordStatus[0] = Job.Status.SUCCEEDED;
-                    ret = true;
-                }
-                else if (coordActionsCount == totalValuesKilled) {
-                    // If all the coordinator actions are KILLED then 
coordinator job should be KILLED.
-                    coordStatus[0] = Job.Status.KILLED;
-                    ret = true;
-                }
-                else if (coordActionsCount == totalValuesFailed) {
-                    // If all the coordinator actions are FAILED then 
coordinator job should be FAILED.
-                    coordStatus[0] = Job.Status.FAILED;
-                    ret = true;
-                }
-                else {
-                    coordStatus[0] = Job.Status.DONEWITHERROR;
-                    ret = true;
                 }
             }
-            return ret;
-        }
-
-        private boolean checkPrepStatus(HashMap<Job.Status, Integer> 
bundleActionStatus,
-                List<BundleActionBean> bundleActions, Job.Status[] 
bundleStatus) {
-            boolean ret = false;
-            if (bundleActionStatus.containsKey(Job.Status.PREP)) {
-                // If all the bundle actions are PREP then bundle job should 
be RUNNING.
-                if (bundleActions.size() > 
bundleActionStatus.get(Job.Status.PREP)) {
-                    bundleStatus[0] = getRunningStatus(bundleActionStatus);
-                    ret = true;
-                }
-            }
-            return ret;
-        }
-
-        private boolean checkPausedStatus(HashMap<Job.Status, Integer> 
bundleActionStatus,
-                List<BundleActionBean> bundleActions, Job.Status[] 
bundleJobStatus) {
-            boolean ret = false;
-
-            // TODO - When bottom up cmds are allowed to change the status of 
parent job,
-            // if none of the bundle actions are in paused or pausedwitherror, 
the function should return
-            // false
-
-            // top down
-            // If the bundle job is PAUSED or PAUSEDINERROR and no children 
are in error
-            // state, then job should be PAUSED otherwise it should be 
pausedwitherror
-            if (bundleJobStatus[0] == Job.Status.PAUSED || bundleJobStatus[0] 
== Job.Status.PAUSEDWITHERROR) {
-                if (bundleActionStatus.containsKey(Job.Status.KILLED)
-                        || bundleActionStatus.containsKey(Job.Status.FAILED)
-                        || 
bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
-                        || 
bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
-                        || 
bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)
-                        || 
bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
-                    bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR;
-                }
-                else {
-                    bundleJobStatus[0] = Job.Status.PAUSED;
-                }
-                ret = true;
-            }
-
-            // bottom up; check the status of parent through their children
-            else if (bundleActionStatus.containsKey(Job.Status.PAUSED)
-                    && (bundleActions.size() == 
bundleActionStatus.get(Job.Status.PAUSED))) {
-                bundleJobStatus[0] = Job.Status.PAUSED;
-                ret = true;
-            }
-            else if 
(bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
-                int pausedActions = 
bundleActionStatus.containsKey(Job.Status.PAUSED) ? bundleActionStatus
-                        .get(Job.Status.PAUSED) : 0;
-                if (bundleActions.size() == pausedActions + 
bundleActionStatus.get(Job.Status.PAUSEDWITHERROR)) {
-                    bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR;
-                    ret = true;
-                }
-            }
-            else {
-                ret = false;
-            }
-            return ret;
-        }
-
-
-        private boolean checkSuspendStatus(HashMap<Job.Status, Integer> 
bundleActionStatus,
-                List<BundleActionBean> bundleActions, Job.Status[] 
bundleStatus, boolean isPending) {
-            boolean ret = false;
-
-            // TODO - When bottom up cmds are allowed to change the status of 
parent job,
-            // if none of the bundle actions are in suspended or 
suspendedwitherror, the function should return
-            // false
-
-            // top down
-            // if job is suspended
-            if (bundleStatus[0] == Job.Status.SUSPENDED
-                    || bundleStatus[0] == Job.Status.SUSPENDEDWITHERROR) {
-                if (bundleActionStatus.containsKey(Job.Status.KILLED)
-                        || bundleActionStatus.containsKey(Job.Status.FAILED)
-                        || 
bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
-                        || 
bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
-                        || 
bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
-                    bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR;
-                }
-                else {
-                    bundleStatus[0] = Job.Status.SUSPENDED;
-                }
-                ret =true;
-            }
-
-            // bottom up
-            // Update status of parent from the status of its children
-            else if (!isPending && 
bundleActionStatus.containsKey(Job.Status.SUSPENDED)
-                    || 
bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)) {
-                int succeededActions = 
bundleActionStatus.containsKey(Job.Status.SUCCEEDED) ? bundleActionStatus
-                        .get(Job.Status.SUCCEEDED) : 0;
-                int killedActions = 
bundleActionStatus.containsKey(Job.Status.KILLED) ? bundleActionStatus
-                        .get(Job.Status.KILLED) : 0;
-                int failedActions = 
bundleActionStatus.containsKey(Job.Status.FAILED) ? bundleActionStatus
-                        .get(Job.Status.FAILED) : 0;
-                int doneWithErrorActions = 
bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) ? bundleActionStatus
-                        .get(Job.Status.DONEWITHERROR) : 0;
-
-                if (bundleActions.size() == 
bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions) {
-                    bundleStatus[0] = Job.Status.SUSPENDED;
-                    ret = true;
-                }
-                else if (bundleActions.size()  == 
bundleActionStatus.get(Job.Status.SUSPENDEDWITHERROR)
-                        + bundleActionStatus.get(Job.Status.SUSPENDED) + 
succeededActions + killedActions + failedActions + doneWithErrorActions) {
-                    bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR;
-                    ret = true;
-                }
-            }
-            return ret;
-
-        }
-
-        private boolean 
checkCoordPausedStatus(HashMap<CoordinatorAction.Status, Integer> 
coordActionStatus,
-                int coordActionsCount, Job.Status[] coordStatus){
-            boolean ret = false;
-            if (coordStatus[0].equals(Job.Status.PAUSED) || 
coordStatus[0].equals(Job.Status.PAUSEDWITHERROR)) {
-                if 
(coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
-                        || 
coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
-                        || 
coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
-                    coordStatus[0] = Job.Status.PAUSEDWITHERROR;
-                }
-                else {
-                    coordStatus[0] = Job.Status.PAUSED;
-                }
-                ret = true;
-            }
-            return ret;
-        }
-        private boolean 
checkCoordSuspendStatus(HashMap<CoordinatorAction.Status, Integer> 
coordActionStatus,
-                int coordActionsCount, Job.Status[] coordStatus, boolean 
isDoneMaterialization, boolean isPending) {
-            boolean ret = false;
-
-            // TODO - When bottom up cmds are allowed to change the status of 
parent job
-            //if none of the coord actions are in suspended or 
suspendedwitherror and materialization done is false
-            //,then the function should return
-            // false
-
-            // top down
-            // check for children only when parent is suspended
-            if (coordStatus[0] == Job.Status.SUSPENDED || coordStatus[0] == 
Job.Status.SUSPENDEDWITHERROR) {
-
-                if 
(coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
-                        || 
coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
-                        || 
coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
-                    coordStatus[0] = Job.Status.SUSPENDEDWITHERROR;
-                }
-                else {
-                    coordStatus[0] = Job.Status.SUSPENDED;
-                }
-                ret = true;
-            }
-            // bottom up
-            // look for children to check the parent's status only if 
materialization is
-            // done and all actions are non-pending
-            else if (isDoneMaterialization && !isPending && 
coordActionStatus.containsKey(CoordinatorAction.Status.SUSPENDED)) {
-                int succeededActions = 
coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED) ? 
coordActionStatus
-                       .get(CoordinatorAction.Status.SUCCEEDED) : 0;
-                int killedActions = 
coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) ? 
coordActionStatus
-                        .get(CoordinatorAction.Status.KILLED) : 0;
-                int failedActions = 
coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) ? 
coordActionStatus
-                        .get(CoordinatorAction.Status.FAILED) : 0;
-                int timedoutActions = 
coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT) ? 
coordActionStatus
-                        .get(CoordinatorAction.Status.TIMEDOUT) : 0;
-
-                if (coordActionsCount == 
coordActionStatus.get(CoordinatorAction.Status.SUSPENDED) + succeededActions) {
-                    coordStatus[0] = Job.Status.SUSPENDED;
-                    ret = true;
-                }
-                else if (coordActionsCount == 
coordActionStatus.get(CoordinatorAction.Status.SUSPENDED)
-                        + succeededActions + killedActions + failedActions + 
timedoutActions) {
-                    coordStatus[0] = Job.Status.SUSPENDEDWITHERROR;
-                    ret = true;
-                }
-            }
-            return ret;
-        }
-
-        private boolean 
checkCoordRunningStatus(HashMap<CoordinatorAction.Status, Integer> 
coordActionStatus,
-                int coordActionsCount, Job.Status[] coordStatus) {
-            boolean ret = false;
-            if (coordStatus[0] != Job.Status.PREP) {
-                if 
(coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
-                        || 
coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
-                        || 
coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
-                    coordStatus[0] = Job.Status.RUNNINGWITHERROR;
-                }
-                else {
-                    coordStatus[0] = Job.Status.RUNNING;
-                }
-                ret = true;
-            }
-            return ret;
-        }
-
-        private boolean checkRunningStatus(HashMap<Job.Status, Integer> 
bundleActionStatus,
-                List<BundleActionBean> bundleActions, Job.Status[] 
bundleStatus) {
-            boolean ret = false;
-            if (bundleStatus[0] != Job.Status.PREP) {
-                bundleStatus[0] = getRunningStatus(bundleActionStatus);
-                ret = true;
-            }
-            return ret;
-
-        }
-
-        private Job.Status getRunningStatus(HashMap<Job.Status, Integer> 
bundleActionStatus) {
-            if (bundleActionStatus.containsKey(Job.Status.FAILED)
-                    || bundleActionStatus.containsKey(Job.Status.KILLED)
-                    || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
-                    || 
bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)) {
-                return Job.Status.RUNNINGWITHERROR;
-            }
-            else {
-                return Job.Status.RUNNING;
-            }
-        }
-
-        private void updateBundleJob(boolean isPending, BundleJobBean 
bundleJob, Job.Status bundleStatus)
-                throws JPAExecutorException {
-            String jobId = bundleJob.getId();
-            // Update the Bundle Job
-            // Check for backward support when RUNNINGWITHERROR, 
SUSPENDEDWITHERROR and
-            // PAUSEDWITHERROR is not supported
-            
bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(bundleStatus));
-            if (isPending) {
-                bundleJob.setPending();
-                LOG.info("Bundle job [" + jobId + "] Pending set to TRUE");
-            }
-            else {
-                bundleJob.resetPending();
-                LOG.info("Bundle job [" + jobId + "] Pending set to FALSE");
-            }
-            
BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING_MODTIME,
-                    bundleJob);
-        }
-
-        private void updateCoordJob(boolean isPending, CoordinatorJobBean 
coordJob, Job.Status coordStatus)
-                throws JPAExecutorException, CommandException {
-            Job.Status prevStatus = coordJob.getStatus();
-            // Update the Coord Job
-            if (coordJob.getStatus() == Job.Status.SUCCEEDED || 
coordJob.getStatus() == Job.Status.FAILED
-                    || coordJob.getStatus() == Job.Status.KILLED || 
coordJob.getStatus() == Job.Status.DONEWITHERROR) {
-                if (coordStatus == Job.Status.SUSPENDED || coordStatus == 
Job.Status.SUSPENDEDWITHERROR) {
-                    LOG.info("Coord Job [" + coordJob.getId()
-                            + "] status to "+ coordStatus +" can not be 
updated as its already in Terminal state");
-                    return;
-                }
-            }
-
-            boolean isPendingStateChanged = checkCoordPending(isPending, 
coordJob, false);
-            // Check for backward support when RUNNINGWITHERROR, 
SUSPENDEDWITHERROR and PAUSEDWITHERROR is
-            // not supported
-            
coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(coordStatus));
-            // Backward support when coordinator namespace is 0.1
-            coordJob.setStatus(StatusUtils.getStatus(coordJob));
-            if (coordJob.getStatus() != prevStatus || isPendingStateChanged) {
-                LOG.info("Set coordinator job [" + coordJob.getId() + "] 
status to '" + coordJob.getStatus() + "' from '"
-                        + prevStatus + "'");
-                coordJob.setLastModifiedTime(new Date());
-                
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_MODTIME,
 coordJob);
-            }
-            // update bundle action only when status changes in coord job
-            if (coordJob.getBundleId() != null) {
-                if (!prevStatus.equals(coordJob.getStatus())) {
-                    BundleStatusUpdateXCommand bundleStatusUpdate = new 
BundleStatusUpdateXCommand(coordJob, prevStatus);
-                    bundleStatusUpdate.call();
-                }
-            }
-        }
-
-        private boolean checkCoordPending(boolean isPending, 
CoordinatorJobBean coordJob, boolean saveToDB)
-                throws JPAExecutorException {
-            // Checking the coordinator pending should be updated or not
-            boolean prevPending = coordJob.isPending();
-            if (isPending) {
-                coordJob.setPending();
-            }
-            else {
-                coordJob.resetPending();
-            }
-            boolean hasChange = prevPending != coordJob.isPending();
-            if (saveToDB && hasChange) {
-                LOG.info("Change coordinator job [" + coordJob.getId() + "] 
pending to '" + coordJob.isPending()
-                        + "' from '" + prevPending + "'");
-                
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_MODTIME,
 coordJob);
-            }
-            return hasChange;
-
         }
 
         /**
@@ -719,6 +165,7 @@ public class StatusTransitService implements Service {
          */
         private void coordTransit() throws JPAExecutorException, 
CommandException {
             List<CoordinatorJobBean> pendingJobCheckList = null;
+            final Set<String> coordIds = new HashSet<String>();
             if (lastInstanceStartTime == null) {
                 LOG.info("Running coordinator status service first instance");
                 // this is the first instance, we need to check for all 
pending jobs;
@@ -727,44 +174,36 @@ public class StatusTransitService implements Service {
             else {
                 LOG.info("Running coordinator status service from last 
instance time =  "
                         + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
-                // this is not the first instance, we should only check jobs
-                // that have actions or jobs been
-                // updated >= start time of last service run;
-                List<CoordinatorActionBean> actionsList = 
CoordActionQueryExecutor.getInstance().getList(
-                        
CoordActionQuery.GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME, 
lastInstanceStartTime);
-                Set<String> coordIds = new HashSet<String>();
-                for (CoordinatorActionBean action : actionsList) {
-                    coordIds.add(action.getJobId());
+                // this is not the first instance, we should only check jobs.
+                // that have actions or jobs been updated >= start time of 
last service run;
+                pendingJobCheckList = 
CoordJobQueryExecutor.getInstance().getList(
+                        CoordJobQuery.GET_COORD_IDS_FOR_STATUS_TRANSIT, 
lastInstanceStartTime);
+
+                
pendingJobCheckList.addAll(CoordJobQueryExecutor.getInstance().getList(
+                        CoordJobQuery.GET_COORD_JOBS_CHANGED, 
lastInstanceStartTime));
+            }
+            for (final CoordinatorJobBean job : pendingJobCheckList) {
+                coordIds.add(job.getId());
+            }
+            coordIds.addAll(coordFailedIds);
+            coordFailedIds.clear();
+            for (final String coordId : coordIds) {
+                try {
+                    new CoordStatusTransitXCommand(coordId).call();
                 }
+                catch (CommandException e) {
+                    // Unable to acquire lock. Will try next time
+                    if (e.getErrorCode() == ErrorCode.E0606) {
+                        coordFailedIds.add(coordId);
+                        LOG.info("Unable to acquire lock for " + coordId + ". 
Will try next time");
 
-                pendingJobCheckList = new ArrayList<CoordinatorJobBean>();
-                for (String coordId : coordIds.toArray(new 
String[coordIds.size()])) {
-                    CoordinatorJobBean coordJob;
-                    try {
-                        coordJob = 
CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, coordId);
-                    }
-                    catch (JPAExecutorException jpaee) {
-                        if (jpaee.getErrorCode().equals(ErrorCode.E0604)) {
-                            LOG.warn("Exception happened during 
StatusTransitRunnable; Coordinator Job doesn't exist", jpaee);
-                            continue;
-                        } else {
-                            throw jpaee;
-                        }
                     }
-                    // Running coord job might have pending false
-                    Job.Status coordJobStatus = coordJob.getStatus();
-                    if ((coordJob.isPending() || 
coordJobStatus.equals(Job.Status.PAUSED)
-                            || coordJobStatus.equals(Job.Status.RUNNING)
-                            || 
coordJobStatus.equals(Job.Status.RUNNINGWITHERROR)
-                            || 
coordJobStatus.equals(Job.Status.PAUSEDWITHERROR))
-                            && !coordJobStatus.equals(Job.Status.IGNORED)) {
-                        pendingJobCheckList.add(coordJob);
+                    else {
+                        LOG.error("Error running CoordStatusTransitXCommand 
for job " + coordId, e);
                     }
+
                 }
-                
pendingJobCheckList.addAll(CoordJobQueryExecutor.getInstance().getList(
-                        CoordJobQuery.GET_COORD_JOBS_CHANGED, 
lastInstanceStartTime));
             }
-            aggregateCoordJobsStatus(pendingJobCheckList);
         }
     }
 
@@ -775,7 +214,7 @@ public class StatusTransitService implements Service {
      */
     @Override
     public void init(Services services) {
-        Configuration conf = services.getConf();
+        final Configuration conf = services.getConf();
         Runnable stateTransitRunnable = new StatusTransitRunnable();
         services.get(SchedulerService.class).schedule(stateTransitRunnable, 10,
                 conf.getInt(CONF_STATUSTRANSIT_INTERVAL, 60), 
SchedulerService.Unit.SEC);
@@ -797,5 +236,5 @@ public class StatusTransitService implements Service {
     public Class<? extends Service> getInterface() {
         return StatusTransitService.class;
     }
-}
 
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleActionQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleActionQueryExecutor.java
 
b/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleActionQueryExecutor.java
index 1fbc3d8..11a1610 100644
--- 
a/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleActionQueryExecutor.java
+++ 
b/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleActionQueryExecutor.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.oozie.executor.jpa;
 
 import java.sql.Timestamp;
@@ -97,10 +96,6 @@ public class TestBundleActionQueryExecutor extends 
XDataTestCase {
         assertEquals(query.getParameterValue("bundleActionId"), 
bean.getBundleId());
 
         query = BundleActionQueryExecutor.getInstance().getSelectQuery(
-                BundleActionQuery.GET_BUNDLE_ACTIONS_BY_LAST_MODIFIED_TIME, 
em, bean.getLastModifiedTime());
-        assertEquals(query.getParameterValue("lastModifiedTime"), 
bean.getLastModifiedTimestamp());
-
-        query = BundleActionQueryExecutor.getInstance().getSelectQuery(
                 BundleActionQuery.GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN, em, 
(long) 100);
         Date date = DateUtils.toDate((Timestamp) 
(query.getParameterValue("lastModifiedTime")));
         assertTrue(date.before(Calendar.getInstance().getTime()));
@@ -138,20 +133,10 @@ public class TestBundleActionQueryExecutor extends 
XDataTestCase {
 
     public void testGetList() throws Exception {
         BundleJobBean job = this.addRecordToBundleJobTable(Job.Status.RUNNING, 
false);
-        BundleActionBean bean1 = 
this.addRecordToBundleActionTable(job.getId(), "coord1", 0, Job.Status.PREP);
-        BundleActionBean bean2 = 
this.addRecordToBundleActionTable(job.getId(), "coord2", 1, Job.Status.RUNNING);
-        BundleActionBean bean3 = 
this.addRecordToBundleActionTable(job.getId(), "coord3", 1, Job.Status.RUNNING);
-        // GET_BUNDLE_ACTIONS_BY_LAST_MODIFIED_TIME
-        Date timeBefore = new Date(bean1.getLastModifiedTime().getTime() - 
1000 * 60);
+        this.addRecordToBundleActionTable(job.getId(), "coord1", 0, 
Job.Status.PREP);
+        this.addRecordToBundleActionTable(job.getId(), "coord2", 1, 
Job.Status.RUNNING);
+        this.addRecordToBundleActionTable(job.getId(), "coord3", 1, 
Job.Status.RUNNING);
         List<BundleActionBean> bActions = 
BundleActionQueryExecutor.getInstance().getList(
-                BundleActionQuery.GET_BUNDLE_ACTIONS_BY_LAST_MODIFIED_TIME, 
timeBefore);
-        assertEquals(3, bActions.size());
-        Date timeAfter = new Date(bean3.getLastModifiedTime().getTime() + 100 
* 60);
-        bActions = BundleActionQueryExecutor.getInstance().getList(
-                BundleActionQuery.GET_BUNDLE_ACTIONS_BY_LAST_MODIFIED_TIME, 
timeAfter);
-        assertEquals(0, bActions.size());
-        // GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN
-        bActions = BundleActionQueryExecutor.getInstance().getList(
                 BundleActionQuery.GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN, 
(long) (1000 * 60));
         assertEquals(0, bActions.size());
         bActions = BundleActionQueryExecutor.getInstance().getList(
@@ -178,4 +163,4 @@ public class TestBundleActionQueryExecutor extends 
XDataTestCase {
         assertEquals(retBean.getCoordName(), "testApp");
         assertEquals(retBean.getStatus(), Job.Status.RUNNING);
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobQueryExecutor.java
 
b/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobQueryExecutor.java
index 0326b47..509eedd 100644
--- 
a/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobQueryExecutor.java
+++ 
b/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobQueryExecutor.java
@@ -15,12 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.oozie.executor.jpa;
 
+import java.util.Date;
+import java.util.List;
+
 import javax.persistence.EntityManager;
 import javax.persistence.Query;
 
+import org.apache.oozie.BundleActionBean;
 import org.apache.oozie.BundleJobBean;
 import org.apache.oozie.client.Job;
 import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
@@ -141,6 +144,24 @@ public class TestBundleJobQueryExecutor extends 
XDataTestCase {
         assertEquals(bean.getId(), retBean.getId());
     }
 
+    public void testBundleIDsForStatusTransit() throws Exception {
+            BundleJobBean job1 = 
this.addRecordToBundleJobTable(Job.Status.RUNNING, false);
+            BundleJobBean job2 = 
this.addRecordToBundleJobTable(Job.Status.RUNNING, false);
+            BundleJobBean job3 = 
this.addRecordToBundleJobTable(Job.Status.RUNNING, false);
+            BundleActionBean bean1 = 
this.addRecordToBundleActionTable(job1.getId(), "coord1", 0, Job.Status.PREP);
+            BundleActionBean bean2 = 
this.addRecordToBundleActionTable(job2.getId(), "coord2", 1, 
Job.Status.RUNNING);
+            BundleActionBean bean3 = 
this.addRecordToBundleActionTable(job3.getId(), "coord3", 1, 
Job.Status.RUNNING);
+            // GET_BUNDLE_ACTIONS_BY_LAST_MODIFIED_TIME
+            Date timeBefore = new Date(bean1.getLastModifiedTime().getTime() - 
1000 * 60);
+            List<BundleJobBean> jobBean = 
BundleJobQueryExecutor.getInstance().getList(
+                    BundleJobQuery.GET_BUNDLE_IDS_FOR_STATUS_TRANSIT, 
timeBefore);
+            assertEquals(3, jobBean.size());
+            Date timeAfter = new Date(bean3.getLastModifiedTime().getTime() + 
100 * 60);
+            jobBean = 
BundleJobQueryExecutor.getInstance().getList(BundleJobQuery.GET_BUNDLE_IDS_FOR_STATUS_TRANSIT,
+                    timeAfter);
+            assertEquals(0, jobBean.size());
+        }
+
     public void testGetList() throws Exception {
         // TODO
     }
@@ -155,4 +176,4 @@ public class TestBundleJobQueryExecutor extends 
XDataTestCase {
         assertEquals(retBean.getAppName(), "testApp");
         assertEquals(retBean.getUser(), "oozie");
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetPendingActionsCountJPAExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetPendingActionsCountJPAExecutor.java
 
b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetPendingActionsCountJPAExecutor.java
deleted file mode 100644
index 5b62fdf..0000000
--- 
a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetPendingActionsCountJPAExecutor.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.executor.jpa;
-
-import org.apache.oozie.CoordinatorJobBean;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.CoordinatorJob;
-import org.apache.oozie.local.LocalOozie;
-import org.apache.oozie.service.JPAService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.test.XDataTestCase;
-
-public class TestCoordJobGetPendingActionsCountJPAExecutor extends 
XDataTestCase {
-    Services services;
-
-    @Override
-    protected void setUp() throws Exception {
-        super.setUp();
-        services = new Services();
-        services.init();
-    }
-
-    @Override
-    protected void tearDown() throws Exception {
-        services.destroy();
-        super.tearDown();
-    }
-
-    public void testCoordJobGetPendingActionsCount() throws Exception {
-        int actionNum = 1;
-        CoordinatorJobBean job = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
-        String jobId = job.getId();
-        // Insert 2 coordinator actions with pending true and 1 coordinator 
action with pending false
-        addRecordToCoordActionTable(jobId, actionNum++, 
CoordinatorAction.Status.FAILED, "coord-action-get.xml", 1);
-        addRecordToCoordActionTable(jobId, actionNum++, 
CoordinatorAction.Status.KILLED, "coord-action-get.xml", 1);
-        addRecordToCoordActionTable(jobId, actionNum++, 
CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0);
-
-        _testCoordActionsPendingCount(jobId, 2);
-    }
-
-    private void _testCoordActionsPendingCount(String jobId, int expectedSize) 
throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-        // Call JPAExecutor to get actions which are pending
-        CoordJobGetPendingActionsCountJPAExecutor actionGetCmd = new 
CoordJobGetPendingActionsCountJPAExecutor(jobId);
-        int pendingCount = jpaService.execute(actionGetCmd);
-        // As two actions are pending, expected count is 2
-        assertEquals(pendingCount, expectedSize);
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java 
b/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java
index 1f9e76a..364d71d 100644
--- a/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.oozie.service;
 
 import java.util.Date;
@@ -55,6 +54,7 @@ import 
org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
+import org.apache.oozie.lock.LockToken;
 import org.apache.oozie.service.StatusTransitService.StatusTransitRunnable;
 import org.apache.oozie.test.XDataTestCase;
 import org.apache.oozie.util.DateUtils;
@@ -824,7 +824,8 @@ public class TestStatusTransitService extends XDataTestCase 
{
         assertNotNull(jpaService);
 
         final String bundleId = job.getId();
-        addRecordToBundleActionTable(bundleId, "action1", 0, 
Job.Status.KILLED);
+        CoordinatorJobBean coord1 = 
addRecordToCoordJobTable(CoordinatorJob.Status.KILLED, false, false);
+        addRecordToBundleActionTable(bundleId, coord1.getId(), "action1", 0, 
Job.Status.KILLED);
 
         String currentDatePlusMonth = 
XDataTestCase.getCurrentDateafterIncrementingInMonths(1);
         Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth);
@@ -1288,7 +1289,10 @@ public class TestStatusTransitService extends 
XDataTestCase {
         final String bundleId = bundleJob.getId();
         addRecordToBundleActionTable(bundleId, "action1", 1, 
Job.Status.PAUSED);
         addRecordToBundleActionTable(bundleId, "action2", 1, 
Job.Status.PAUSED);
-        addRecordToBundleActionTable(bundleId, "action3", 0, 
Job.Status.FAILED);
+        BundleActionBean bundleAction = addRecordToBundleActionTable(bundleId, 
"action3", 0, Job.Status.FAILED);
+        bundleAction.setCoordId("test");
+        BundleActionQueryExecutor.getInstance().executeUpdate(
+                
BundleActionQuery.UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME_COORDID, 
bundleAction);
 
         Runnable runnable = new StatusTransitRunnable();
         runnable.run();
@@ -1443,13 +1447,9 @@ public class TestStatusTransitService extends 
XDataTestCase {
         BundleJobBean bundleJob = 
this.addRecordToBundleJobTable(Job.Status.RUNNING, true);
         final JPAService jpaService = Services.get().get(JPAService.class);
         final String bundleId = bundleJob.getId();
-        BundleActionBean bundleAction1 = 
addRecordToBundleActionTable(bundleId, "action1-C", 1, Job.Status.RUNNING);
-        String currentDatePlusMonth = 
XDataTestCase.getCurrentDateafterIncrementingInMonths(1);
-        Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth);
-        Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth);
-
-        addRecordToCoordJobTableWithBundle(bundleId, "action1-C", 
CoordinatorJob.Status.RUNNING, start, end, false,
-                true, 1);
+        CoordinatorJobBean coord1 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+        BundleActionBean bundleAction1 = 
addRecordToBundleActionTable(bundleId, coord1.getId(), "action1-C", 1,
+                Job.Status.RUNNING);
         bundleJob.setPending();
         bundleAction1.setStatus(Job.Status.KILLED);
         bundleAction1.setPending(0);
@@ -1467,7 +1467,9 @@ public class TestStatusTransitService extends 
XDataTestCase {
 
         bundleAction1.setPending(1);
         bundleAction1.setStatus(Job.Status.RUNNING);
+        bundleAction1.setLastModifiedTime(new Date());
         bundleJob.setPending();
+        bundleJob.setLastModifiedTime(new Date());
         BundleActionQueryExecutor.getInstance().executeUpdate(
                 
BundleActionQuery.UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME_COORDID, 
bundleAction1);
         
BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING,
 bundleJob);
@@ -1479,15 +1481,20 @@ public class TestStatusTransitService extends 
XDataTestCase {
 
     // Test bundle transition from running to runningwitherror when one action 
is killed.
     public void testBundleStatusTransitRunningWithError() throws Exception {
-        Services.get().destroy();
         
setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR,
 "false");
-        new Services().init();
+        services = new Services();
+        services.init();
+
+        final CoordinatorJobBean coord1 = 
addRecordToCoordJobTable(CoordinatorJob.Status.PREP, false, false);
+        final CoordinatorJobBean coord2 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+        final CoordinatorJobBean coord3 = 
addRecordToCoordJobTable(CoordinatorJob.Status.DONEWITHERROR, false, false);
 
         BundleJobBean bundleJob = 
this.addRecordToBundleJobTable(Job.Status.RUNNING, true);
         final String bundleId = bundleJob.getId();
-        addRecordToBundleActionTable(bundleId, "action1-C", 0, 
Job.Status.PREP);
-        addRecordToBundleActionTable(bundleId, "action2-C", 0, 
Job.Status.RUNNING);
-        BundleActionBean action3 = addRecordToBundleActionTable(bundleId, 
"action3-C", 0, Job.Status.DONEWITHERROR);
+        addRecordToBundleActionTable(bundleId, coord1.getId(), "action1-C", 0, 
Job.Status.PREP);
+        addRecordToBundleActionTable(bundleId, coord2.getId(), "action2-C", 0, 
Job.Status.RUNNING);
+        BundleActionBean action3 = addRecordToBundleActionTable(bundleId, 
coord3.getId(), "action3-C", 0,
+                Job.Status.DONEWITHERROR);
 
         Runnable runnable = new StatusTransitRunnable();
         runnable.run();
@@ -1505,4 +1512,122 @@ public class TestStatusTransitService extends 
XDataTestCase {
         assertEquals(Job.Status.RUNNING, bundleJob.getStatus());
 
     }
+
+    public void testBundleStatusTransitWithLock() throws Exception {
+        
setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR,
 "false");
+        services = new Services();
+        services.init();
+
+        BundleJobBean bundleJob = 
this.addRecordToBundleJobTable(Job.Status.RUNNING, true);
+
+        final String jobId = bundleJob.getId();
+        final String bundleId = bundleJob.getId();
+        addRecordToBundleActionTable(bundleId, "action1-C", 0, 
Job.Status.PREP);
+        addRecordToBundleActionTable(bundleId, "action2-C", 0, 
Job.Status.RUNNING);
+        addRecordToBundleActionTable(bundleId, "action3-C", 0, 
Job.Status.DONEWITHERROR);
+
+        JobLock lockThread = new JobLock(jobId);
+        new Thread(lockThread).start();
+
+        sleep(1000);
+        Runnable runnable = new StatusTransitRunnable();
+        runnable.run();
+        bundleJob = 
BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB_STATUS, 
bundleId);
+        assertEquals(Job.Status.RUNNING, bundleJob.getStatus());
+        synchronized (lockThread) {
+            lockThread.notifyAll();
+        }
+        sleep(1000);
+        runnable.run();
+        bundleJob = 
BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB_STATUS, 
bundleId);
+        assertEquals(Job.Status.RUNNINGWITHERROR, bundleJob.getStatus());
+
+    }
+
+    public void testCoordStatusTransitWithLock() throws Exception {
+        
setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR,
 "false");
+        services = new Services();
+        services.init();
+
+        final JPAService jpaService = Services.get().get(JPAService.class);
+        String currentDatePlusMonth = 
XDataTestCase.getCurrentDateafterIncrementingInMonths(1);
+        Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth);
+        Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth);
+
+        CoordinatorJobBean coordJob = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, false,
+                1);
+        addRecordToCoordActionTable(coordJob.getId(), 1, 
CoordinatorAction.Status.KILLED, "coord-action-get.xml", null,
+                "KILLED", 0);
+
+        final CoordJobGetJPAExecutor coordJobGetCmd = new 
CoordJobGetJPAExecutor(coordJob.getId());
+        JobLock lockThread = new JobLock(coordJob.getId());
+        new Thread(lockThread).start();
+        Runnable runnable = new StatusTransitRunnable();
+        runnable.run();
+        sleep(1000);
+        coordJob = jpaService.execute(coordJobGetCmd);
+        assertEquals(CoordinatorJob.Status.RUNNING, coordJob.getStatus());
+
+        synchronized (lockThread) {
+            lockThread.notifyAll();
+        }
+        runnable.run();
+        coordJob = jpaService.execute(coordJobGetCmd);
+        assertEquals(CoordinatorJob.Status.RUNNINGWITHERROR, 
coordJob.getStatus());
+
+    }
+
+    public void testBundleStatusCoordSubmitFails() throws Exception {
+        
setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR,
 "false");
+        services = new Services();
+        services.init();
+        BundleJobBean bundleJob = 
this.addRecordToBundleJobTable(Job.Status.RUNNING, false);
+
+        final String bundleId = bundleJob.getId();
+        addRecordToBundleActionTable(bundleId, null, 0, Job.Status.FAILED);
+        Runnable runnable = new StatusTransitRunnable();
+        runnable.run();
+        // First try will kill the job.
+        bundleJob = 
BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB_STATUS, 
bundleId);
+        assertEquals(Job.Status.FAILED, bundleJob.getStatus());
+        sleep(1000);
+        bundleJob.setStatus(Job.Status.RUNNING);
+        
BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING,
 bundleJob);
+        runnable.run();
+        // second try will change the status.
+        bundleJob = 
BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB_STATUS, 
bundleId);
+        assertEquals(Job.Status.FAILED, bundleJob.getStatus());
+
+    }
+
+    static class JobLock implements Runnable {
+        String jobId;
+
+        public JobLock(String jobId) {
+            this.jobId = jobId;
+        }
+
+        LockToken lock = null;
+
+        public void acquireLock() throws InterruptedException {
+            lock = 
Services.get().get(MemoryLocksService.class).getWriteLock(jobId, 0);
+        }
+
+        public void release() {
+            lock.release();
+        }
+
+        @Override
+        public void run() {
+            try {
+                acquireLock();
+                synchronized (this) {
+                    this.wait();
+                }
+                release();
+            }
+            catch (InterruptedException e) {
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index ba512c5..f4a866d 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.2.0 release (trunk - unreleased)
 
+OOZIE-1940 StatusTransitService has race condition (puru)
 OOZIE-1696 Document how to get the action conf in the Java action (jrkinley 
via rkanter)
 OOZIE-1567 Provide a wait tool in Oozie (rkanter)
 OOZIE-2014 TestAuthFilterAuthOozieClient fails after OOZIE-1917 (rkanter)

Reply via email to