OOZIE-1940 StatusTransitService has race condition

Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/b87686b7
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/b87686b7
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/b87686b7

Branch: refs/heads/master
Commit: b87686b74ad2ab888b3a3b3b2521da66fe35a0c4
Parents: 1292c7f
Author: Purshotam Shah <[email protected]>
Authored: Tue Sep 30 10:17:00 2014 -0700
Committer: Purshotam Shah <[email protected]>
Committed: Tue Sep 30 10:17:00 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/oozie/BundleActionBean.java |   2 -
 .../java/org/apache/oozie/BundleJobBean.java    |   3 +
 .../org/apache/oozie/CoordinatorActionBean.java |   2 +-
 .../org/apache/oozie/CoordinatorJobBean.java    |   5 +-
 .../main/java/org/apache/oozie/ErrorCode.java   |   3 +
 .../oozie/command/StatusTransitXCommand.java    | 160 +++++
 .../bundle/BundleStatusTransitXCommand.java     | 325 +++++++++
 .../coord/CoordStatusTransitXCommand.java       | 297 ++++++++
 .../executor/jpa/BundleActionQueryExecutor.java |   8 -
 .../executor/jpa/BundleJobQueryExecutor.java    |  13 +-
 .../executor/jpa/CoordActionQueryExecutor.java  |  27 +-
 ...ordJobGetPendingActionsCountJPAExecutor.java |  57 --
 .../executor/jpa/CoordJobQueryExecutor.java     |  13 +-
 .../oozie/service/StatusTransitService.java     | 695 ++-----------------
 .../jpa/TestBundleActionQueryExecutor.java      |  23 +-
 .../jpa/TestBundleJobQueryExecutor.java         |  25 +-
 ...ordJobGetPendingActionsCountJPAExecutor.java |  68 --
 .../oozie/service/TestStatusTransitService.java | 155 ++++-
 release-log.txt                                 |   1 +
 19 files changed, 1076 insertions(+), 806 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/main/java/org/apache/oozie/BundleActionBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/BundleActionBean.java 
b/core/src/main/java/org/apache/oozie/BundleActionBean.java
index 963497d..65bfe8c 100644
--- a/core/src/main/java/org/apache/oozie/BundleActionBean.java
+++ b/core/src/main/java/org/apache/oozie/BundleActionBean.java
@@ -57,8 +57,6 @@ import org.json.simple.JSONObject;
 
         @NamedQuery(name = "GET_BUNDLE_ACTIONS", query = "select OBJECT(w) 
from BundleActionBean w"),
 
-        @NamedQuery(name = "GET_BUNDLE_ACTIONS_BY_LAST_MODIFIED_TIME", query = 
"select w.bundleId from BundleActionBean w where w.lastModifiedTimestamp >= 
:lastModifiedTime"),
-
         @NamedQuery(name = "GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN", query = 
"select w.bundleActionId, w.bundleId, w.statusStr, w.coordId, w.coordName from 
BundleActionBean w where w.pending > 0 AND w.lastModifiedTimestamp <= 
:lastModifiedTime"),
 
         @NamedQuery(name = "GET_BUNDLE_ACTION", query = "select OBJECT(w) from 
BundleActionBean w where w.bundleActionId = :bundleActionId"),

http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/main/java/org/apache/oozie/BundleJobBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/BundleJobBean.java 
b/core/src/main/java/org/apache/oozie/BundleJobBean.java
index 76e76b7..4dbffc3 100644
--- a/core/src/main/java/org/apache/oozie/BundleJobBean.java
+++ b/core/src/main/java/org/apache/oozie/BundleJobBean.java
@@ -107,6 +107,9 @@ import org.json.simple.JSONObject;
 
         @NamedQuery(name = "BULK_MONITOR_COUNT_QUERY", query = "SELECT 
COUNT(a) FROM CoordinatorActionBean a, CoordinatorJobBean c"),
 
+        @NamedQuery(name = "GET_BUNDLE_IDS_FOR_STATUS_TRANSIT", query = 
"select DISTINCT w.id from BundleActionBean a , BundleJobBean w where 
a.lastModifiedTimestamp >= :lastModifiedTime and w.id = a.bundleId and 
(w.statusStr = 'RUNNING' OR w.statusStr = 'RUNNINGWITHERROR' OR w.statusStr = 
'PAUSED' OR w.statusStr = 'PAUSEDWITHERROR' OR w.pending = 1)"),
+
+
         @NamedQuery(name = "GET_BUNDLE_JOB_FOR_USER", query = "select w.user 
from BundleJobBean w where w.id = :id") })
 @Table(name = "BUNDLE_JOBS")
 public class BundleJobBean implements Writable, BundleJob, JsonBean {

http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java 
b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
index cc5596b..c5a6ca8 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
@@ -137,7 +137,7 @@ import org.json.simple.JSONObject;
         @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_COUNT", query = "select 
count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending > 
0"),
 
         // Query to retrieve status of Coordinator actions
-        @NamedQuery(name = "GET_COORD_ACTIONS_STATUS_UNIGNORED", query = 
"select a.statusStr from CoordinatorActionBean a where a.jobId = :jobId AND 
a.statusStr <> 'IGNORED'"),
+        @NamedQuery(name = "GET_COORD_ACTIONS_STATUS_UNIGNORED", query = 
"select a.statusStr, a.pending from CoordinatorActionBean a where a.jobId = 
:jobId AND a.statusStr <> 'IGNORED'"),
 
         // Query to retrieve status of Coordinator actions
         @NamedQuery(name = "GET_COORD_ACTION_STATUS", query = "select 
a.statusStr from CoordinatorActionBean a where a.id = :id"),

http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java 
b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
index 03757dd..8f11645 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
@@ -133,8 +133,11 @@ import org.json.simple.JSONObject;
 
         @NamedQuery(name = "GET_COORD_JOB_STATUS", query = "select w.statusStr 
from CoordinatorJobBean w where w.id = :id"),
 
-        @NamedQuery(name = "GET_COORD_JOB_STATUS_PARENTID", query = "select 
w.statusStr, w.bundleId from CoordinatorJobBean w where w.id = :id")})
+        @NamedQuery(name = "GET_COORD_JOB_STATUS_PARENTID", query = "select 
w.statusStr, w.bundleId from CoordinatorJobBean w where w.id = :id"),
 
+        @NamedQuery(name = "GET_COORD_IDS_FOR_STATUS_TRANSIT", query = "select 
DISTINCT w.id from CoordinatorActionBean a, CoordinatorJobBean w where w.id = 
a.jobId and a.lastModifiedTimestamp >= :lastModifiedTime and (w.statusStr IN 
('PAUSED', 'RUNNING', 'RUNNINGWITHERROR', 'PAUSEDWITHERROR') or w.pending = 1)  
and w.statusStr <> 'IGNORED'")
+
+})
 @NamedNativeQueries({
         @NamedNativeQuery(name = "GET_COORD_FOR_ABANDONEDCHECK", query = 
"select w.id, w.USER_NAME, w.group_name, w.APP_NAME from coord_jobs w where ( 
w.STATUS = 'RUNNING' or w.STATUS = 'RUNNINGWITHERROR' ) and w.start_time < ?2 
and w.id in (select failedJobs.job_id from (select a.job_id from coord_actions 
a where ( a.STATUS = 'FAILED' or a.STATUS = 'TIMEDOUT'  or a.STATUS = 
'SUSPENDED') group by a.job_id having count(*) >= ?1 ) failedJobs LEFT OUTER 
JOIN (select b.job_id from coord_actions b where b.STATUS = 'SUCCEEDED' group 
by b.job_id having count(*) > 0 ) successJobs   on  failedJobs.job_id = 
successJobs.job_id where successJobs.job_id is null )")
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/main/java/org/apache/oozie/ErrorCode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/ErrorCode.java 
b/core/src/main/java/org/apache/oozie/ErrorCode.java
index f9f88f4..4afeb6c 100644
--- a/core/src/main/java/org/apache/oozie/ErrorCode.java
+++ b/core/src/main/java/org/apache/oozie/ErrorCode.java
@@ -207,6 +207,8 @@ public enum ErrorCode {
     E1022(XLog.STD, "Cannot delete running/completed coordinator action: 
[{0}]"),
     E1023(XLog.STD, "Coord Job update Error: [{0}]"),
     E1024(XLog.STD, "Cannot run ignore command: [{0}]"),
+    E1025(XLog.STD, "Coord status transit error: [{0}]"),
+
 
     E1100(XLog.STD, "Command precondition does not hold before execution, 
[{0}]"),
 
@@ -235,6 +237,7 @@ public enum ErrorCode {
     E1319(XLog.STD, "Invalid bundle coord job namespace, [{0}]"),
     E1320(XLog.STD, "Bundle Job change error, [{0}]"),
     E1321(XLog.STD, "Error evaluating coord name, [{0}]"),
+    E1322(XLog.STD, "Bundle status transit error: [{0}]"),
 
 
     E1400(XLog.STD, "doAs (proxyuser) failure"),

http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/main/java/org/apache/oozie/command/StatusTransitXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/StatusTransitXCommand.java 
b/core/src/main/java/org/apache/oozie/command/StatusTransitXCommand.java
new file mode 100644
index 0000000..3e057b2
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/StatusTransitXCommand.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command;
+
+import org.apache.oozie.client.Job;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+
+/**
+ * StatusTransitXCommand is super class for Status Transit Command, it defines 
layout for Status Transit Commands. It
+ * tries change job status change after acquiring lock with zero timeout. 
StatusTransit Commands are not requeued.
+ */
+abstract public class StatusTransitXCommand extends XCommand<Void> {
+
+    /**
+     * Instantiates a new status transit x command.
+     *
+     * @param name the name
+     * @param type the type
+     * @param priority the priority
+     */
+    public StatusTransitXCommand(String name, String type, int priority) {
+        super(name, type, priority);
+    }
+
+    @Override
+    final protected long getLockTimeOut() {
+        return 0L;
+    }
+
+    @Override
+    final protected boolean isReQueueRequired() {
+        return false;
+    }
+
+    @Override
+    final protected boolean isLockRequired() {
+        return true;
+    }
+
+    @Override
+    protected Void execute() throws CommandException {
+
+        final Job.Status jobStatus = getJobStatus();
+        try {
+            if (jobStatus != null) {
+                updateJobStatus(jobStatus);
+            }
+        }
+        catch (JPAExecutorException e) {
+            throw new CommandException(e);
+        }
+
+        return null;
+
+    }
+
+    /**
+     * Gets the job status.
+     *
+     * @return the job status
+     * @throws CommandException the command exception
+     */
+    protected Job.Status getJobStatus() throws CommandException {
+        if (isTerminalState()) {
+            return getTerminalStatus();
+        }
+        if (isPausedState()) {
+            return getPausedState();
+        }
+        if (isSuspendedState()) {
+            return getSuspendedStatus();
+        }
+        if (isRunningState()) {
+            return getRunningState();
+        }
+        return null;
+    }
+
+    /**
+     * Checks if job is in terminal state.
+     *
+     * @return true, if is terminal state
+     */
+    protected abstract boolean isTerminalState();
+
+    /**
+     * Gets the job terminal status.
+     *
+     * @return the terminal status
+     */
+    protected abstract Job.Status getTerminalStatus();
+
+    /**
+     * Checks if job is in paused state.
+     *
+     * @return true, if job is in paused state
+     */
+    protected abstract boolean isPausedState();
+
+    /**
+     * Gets the job pause state.
+     *
+     * @return the paused state
+     */
+    protected abstract Job.Status getPausedState();
+
+    /**
+     * Checks if is in suspended state.
+     *
+     * @return true, if job is in suspended state
+     */
+    protected abstract boolean isSuspendedState();
+
+    /**
+     * Gets the suspended status.
+     *
+     * @return the suspended status
+     */
+    protected abstract Job.Status getSuspendedStatus();
+
+    /**
+     * Checks if job is in running state.
+     *
+     * @return true, if job is in running state
+     */
+    protected abstract boolean isRunningState();
+
+    /**
+     * Gets the job running state.
+     *
+     * @return the running state
+     */
+    protected abstract Job.Status getRunningState();
+
+    /**
+     * Update job status.
+     *
+     * @param status the status
+     * @throws JPAExecutorException the JPA executor exception
+     * @throws CommandException the command exception
+     */
+    protected abstract void updateJobStatus(Job.Status status) throws 
JPAExecutorException, CommandException;
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java
 
b/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java
new file mode 100644
index 0000000..d6a3197
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.bundle;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.oozie.BundleActionBean;
+import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.Job.Status;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.command.StatusTransitXCommand;
+import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
+import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
+import 
org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
+import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.util.LogUtils;
+import org.apache.oozie.util.StatusUtils;
+
+/**
+ * BundleStatusTransitXCommand update job's status according to its child 
actions' status. If all child actions' pending
+ * flag equals 0 (job done), we reset the job's pending flag to 0. If all 
child actions are succeeded, we set the job's
+ * status to SUCCEEDED.
+ */
+public class BundleStatusTransitXCommand extends StatusTransitXCommand {
+
+    private String jobId;
+    private List<BundleActionBean> bundleActions;
+    private BundleJobBean bundleJob;
+    private boolean foundPending;
+    private HashMap<Job.Status, Integer> bundleActionStatus = new 
HashMap<Job.Status, Integer>();
+
+    public BundleStatusTransitXCommand(String id) {
+        super("bundle_status_transit", "bundle_status_transit", 0);
+        this.jobId = id;
+    }
+
+    @Override
+    public String getEntityKey() {
+        return jobId;
+    }
+
+    @Override
+    protected void loadState() throws CommandException {
+        try {
+            bundleJob = BundleJobQueryExecutor.getInstance().get(
+                    BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME, 
jobId);
+
+            bundleActions = BundleActionQueryExecutor.getInstance().getList(
+                    
BundleActionQuery.GET_BUNDLE_UNIGNORED_ACTION_STATUS_PENDING_FOR_BUNDLE, jobId);
+            for (BundleActionBean bAction : bundleActions) {
+                int counter = 0;
+                if (bundleActionStatus.containsKey(bAction.getStatus())) {
+                    counter = bundleActionStatus.get(bAction.getStatus()) + 1;
+                }
+                else {
+                    ++counter;
+                }
+                bundleActionStatus.put(bAction.getStatus(), counter);
+                if (bAction.getCoordId() == null
+                        && (bAction.getStatus() == Job.Status.FAILED || 
bAction.getStatus() == Job.Status.KILLED) ) {
+                    new BundleKillXCommand(jobId).call();
+                    bundleJob = BundleJobQueryExecutor.getInstance().get(
+                            
BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME, jobId);
+                    bundleJob.setStatus(Job.Status.FAILED);
+                    bundleJob.setLastModifiedTime(new Date());
+                    
BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS,
+                            bundleJob);
+                }
+
+                if (bAction.isPending()) {
+                    foundPending = true;
+                }
+            }
+            LogUtils.setLogInfo(bundleJob);
+        }
+        catch (JPAExecutorException e) {
+            throw new CommandException(ErrorCode.E1322, e);
+        }
+    }
+
+    @Override
+    protected Job.Status getJobStatus() throws CommandException {
+        Job.Status jobStatus = super.getJobStatus();
+        if (jobStatus == null) {
+            if (isPrepRunningState()) {
+                return getPrepRunningStatus();
+            }
+        }
+
+        return jobStatus;
+    }
+
+    @Override
+    protected boolean isTerminalState() {
+        return !foundPending
+                && bundleActions.size() == 
getActionStatusCount(Job.Status.SUCCEEDED)
+                        + getActionStatusCount(Job.Status.FAILED) + 
getActionStatusCount(Job.Status.KILLED)
+                        + getActionStatusCount(Job.Status.DONEWITHERROR);
+    }
+
+    @Override
+    protected Job.Status getTerminalStatus() {
+
+        // If all bundle action is done and bundle is killed, then don't 
change the status.
+        if (bundleJob.getStatus().equals(Job.Status.KILLED)) {
+            return Job.Status.KILLED;
+
+        }
+        // If all the bundle actions are succeeded then bundle job should be 
succeeded.
+        if (bundleActions.size() == 
getActionStatusCount(Job.Status.SUCCEEDED)) {
+            return Job.Status.SUCCEEDED;
+
+        }
+        else if (bundleActions.size() == 
getActionStatusCount(Job.Status.KILLED)) {
+            // If all the bundle actions are KILLED then bundle job should be 
KILLED.
+            return Job.Status.KILLED;
+        }
+        else if (bundleActions.size() == 
getActionStatusCount(Job.Status.FAILED)) {
+            // If all the bundle actions are FAILED then bundle job should be 
FAILED.
+            return Job.Status.FAILED;
+        }
+        else {
+            return Job.Status.DONEWITHERROR;
+
+        }
+    }
+
+    @Override
+    protected boolean isPausedState() {
+
+        if (bundleJob.getStatus() == Job.Status.PAUSED || 
bundleJob.getStatus() == Job.Status.PAUSEDWITHERROR) {
+            return true;
+        }
+        else {
+            return getBottomUpPauseStatus() != null;
+        }
+    }
+
+    @Override
+    protected Job.Status getPausedState() {
+        if (bundleJob.getStatus() == Job.Status.PAUSED || 
bundleJob.getStatus() == Job.Status.PAUSEDWITHERROR) {
+            if (hasTerminatedActions() || 
bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
+                    || 
bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)
+                    || 
bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
+                return Job.Status.PAUSEDWITHERROR;
+            }
+            else {
+                return Job.Status.PAUSED;
+            }
+        }
+        return getBottomUpPauseStatus();
+
+    }
+
+    @Override
+    protected boolean isSuspendedState() {
+        if (bundleJob.getStatus() == Job.Status.SUSPENDED || 
bundleJob.getStatus() == Job.Status.SUSPENDEDWITHERROR) {
+            return true;
+        }
+
+        return getBottomUpSuspendedState() != null;
+
+    }
+
+    @Override
+    protected Job.Status getSuspendedStatus() {
+        if (bundleJob.getStatus() == Job.Status.SUSPENDED || 
bundleJob.getStatus() == Job.Status.SUSPENDEDWITHERROR) {
+            if (hasTerminatedActions() || 
bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
+                    || 
bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
+                return Job.Status.SUSPENDEDWITHERROR;
+            }
+            else {
+                return Job.Status.SUSPENDED;
+            }
+
+        }
+        return getBottomUpSuspendedState();
+
+    }
+
+    @Override
+    protected boolean isRunningState() {
+        return true;
+    }
+
+    @Override
+    protected Status getRunningState() {
+        if (bundleJob.getStatus() != Job.Status.PREP) {
+            return getRunningStatus(bundleActionStatus);
+        }
+        else
+            return null;
+    }
+
+    @Override
+    protected void updateJobStatus(Job.Status bundleStatus) throws 
JPAExecutorException {
+        LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus + 
"' from '" + bundleJob.getStatus() + "'");
+
+        String jobId = bundleJob.getId();
+        // Update the Bundle Job
+        // Check for backward support when RUNNINGWITHERROR, 
SUSPENDEDWITHERROR and
+        // PAUSEDWITHERROR is not supported
+        
bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(bundleStatus));
+        if (foundPending) {
+            bundleJob.setPending();
+            LOG.info("Bundle job [" + jobId + "] Pending set to TRUE");
+        }
+        else {
+            bundleJob.resetPending();
+            LOG.info("Bundle job [" + jobId + "] Pending set to FALSE");
+        }
+        
BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING_MODTIME,
+                bundleJob);
+    }
+
+    /**
+     * bottom up; check the status of parent through their children.
+     *
+     * @return the bottom up pause status
+     */
+    private Job.Status getBottomUpPauseStatus() {
+
+        if (bundleActionStatus.containsKey(Job.Status.PAUSED)
+                && bundleActions.size() == 
bundleActionStatus.get(Job.Status.PAUSED)) {
+            return Job.Status.PAUSED;
+
+        }
+        else if (bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)
+                && bundleActions.size() == 
getActionStatusCount(Job.Status.PAUSED)
+                        + getActionStatusCount(Job.Status.PAUSEDWITHERROR)) {
+            return Job.Status.PAUSEDWITHERROR;
+        }
+
+        return null;
+    }
+
+    /**
+     * Bottom up update status of parent from the status of its children.
+     *
+     * @return the bottom up suspended state
+     */
+    private Job.Status getBottomUpSuspendedState() {
+
+        if (!foundPending && 
bundleActionStatus.containsKey(Job.Status.SUSPENDED)
+                || 
bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)) {
+
+            if (bundleActions.size() == 
getActionStatusCount(Job.Status.SUSPENDED)
+                    + getActionStatusCount(Job.Status.SUCCEEDED)) {
+                return Job.Status.SUSPENDED;
+            }
+            else if (bundleActions.size() == 
getActionStatusCount(Job.Status.SUSPENDEDWITHERROR)
+                    + bundleActionStatus.get(Job.Status.SUSPENDED) + 
getActionStatusCount(Job.Status.SUCCEEDED)
+                    + getActionStatusCount(Job.Status.KILLED) + 
getActionStatusCount(Job.Status.FAILED)
+                    + getActionStatusCount(Job.Status.DONEWITHERROR)) {
+                return Job.Status.SUSPENDEDWITHERROR;
+
+            }
+        }
+        return null;
+    }
+
+    private boolean hasTerminatedActions() {
+        return bundleActionStatus.containsKey(Job.Status.KILLED) || 
bundleActionStatus.containsKey(Job.Status.FAILED)
+                || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR);
+
+    }
+
+    private boolean isPrepRunningState() {
+        return !foundPending && bundleActionStatus.containsKey(Job.Status.PREP)
+                && bundleActions.size() > 
bundleActionStatus.get(Job.Status.PREP);
+    }
+
+    private Status getPrepRunningStatus() {
+        return getRunningStatus(bundleActionStatus);
+
+    }
+
+    private int getActionStatusCount(final Job.Status status) {
+
+        if (bundleActionStatus.containsKey(status)) {
+            return bundleActionStatus.get(status);
+        }
+        else {
+            return 0;
+        }
+    }
+
+    private Job.Status getRunningStatus(HashMap<Job.Status, Integer> 
actionStatus) {
+        if (actionStatus.containsKey(Job.Status.FAILED) || 
actionStatus.containsKey(Job.Status.KILLED)
+                || actionStatus.containsKey(Job.Status.DONEWITHERROR)
+                || actionStatus.containsKey(Job.Status.RUNNINGWITHERROR)) {
+            return Job.Status.RUNNINGWITHERROR;
+        }
+        else {
+            return Job.Status.RUNNING;
+        }
+    }
+
+    @Override
+    protected void verifyPrecondition() throws CommandException, 
PreconditionException {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/main/java/org/apache/oozie/command/coord/CoordStatusTransitXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/coord/CoordStatusTransitXCommand.java
 
b/core/src/main/java/org/apache/oozie/command/coord/CoordStatusTransitXCommand.java
new file mode 100644
index 0000000..2c5aab8
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/command/coord/CoordStatusTransitXCommand.java
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.coord;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.Job.Status;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.command.StatusTransitXCommand;
+import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.SchemaService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.StatusTransitService;
+import org.apache.oozie.util.LogUtils;
+import org.apache.oozie.util.StatusUtils;
+
+/**
+ * CoordStatusTransitXCommand update coord job's status according to its child 
actions' status. If all child actions'
+ * pending flag equals 0 (job done), we reset the job's pending flag to 0. If 
all child actions are succeeded, we set
+ * the job's status to SUCCEEDED.
+ */
+public class CoordStatusTransitXCommand extends StatusTransitXCommand {
+
+    private final String jobId;
+    private CoordinatorJobBean coordJob;
+    int coordActionCount;
+    private final Map<CoordinatorAction.Status, Integer> coordActionStatus = 
new HashMap<CoordinatorAction.Status, Integer>();
+    boolean isPending = false;
+
+    final boolean backwardSupportForCoordStatus = Services.get().getConf()
+            
.getBoolean(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false);
+
+    public CoordStatusTransitXCommand(String jobId) {
+        super("coord_status_transit", "coord_status_transit", 0);
+        this.jobId = jobId;
+    }
+
+    @Override
+    public String getEntityKey() {
+        return jobId;
+    }
+
+    @Override
+    protected void loadState() throws CommandException {
+        try {
+            coordJob = 
CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, jobId);
+            List<CoordinatorActionBean> coordActionStatusList = 
CoordActionQueryExecutor.getInstance().getList(
+                    CoordActionQuery.GET_COORD_ACTIONS_STATUS_UNIGNORED, 
jobId);
+
+            long count = (Long) 
CoordActionQueryExecutor.getInstance().getSingleValue(
+                    CoordActionQuery.GET_COORD_ACTIONS_PENDING_COUNT, jobId);
+            if (count > 0) {
+                isPending = true;
+            }
+
+            for (CoordinatorAction coordAction : coordActionStatusList) {
+                int counter = 0;
+                if (coordActionStatus.containsKey(coordAction.getStatus())) {
+                    counter = coordActionStatus.get(coordAction.getStatus()) + 
1;
+                }
+                else {
+                    ++counter;
+                }
+                coordActionStatus.put(coordAction.getStatus(), counter);
+            }
+            coordActionCount = coordActionStatusList.size();
+        }
+        catch (JPAExecutorException jpae) {
+            throw new CommandException(ErrorCode.E1025, jpae);
+        }
+        LogUtils.setLogInfo(this.coordJob);
+    }
+
+    @Override
+    protected void verifyPrecondition() throws CommandException, 
PreconditionException {
+        // if namespace 0.1 is used and backward support is true, then ignore 
this coord job
+        if (backwardSupportForCoordStatus == true && 
coordJob.getAppNamespace() != null
+                && 
coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
+            throw new CommandException(ErrorCode.E1025,
+                    " Coord namespace is 0.1 and 
backward.support.for.coord.status is set");
+        }
+
+    }
+
+    @Override
+    protected Job.Status getJobStatus() throws CommandException {
+        Job.Status jobStatus = super.getJobStatus();
+        if (jobStatus == null) {
+            jobStatus = coordJob.getStatus();
+        }
+
+        return jobStatus;
+    }
+
+    @Override
+    protected boolean isTerminalState() {
+        return (coordJob.isDoneMaterialization() || coordJob.getStatus() == 
Job.Status.FAILED ||
+                coordJob.getStatus() == Job.Status.KILLED) && 
isCoordTerminalStatus(coordActionCount);
+    }
+
+    @Override
+    protected Status getTerminalStatus() {
+
+        // If all coord action is done and coord is killed, then don't change 
the status.
+        if (coordJob.getStatus().equals(Job.Status.KILLED)) {
+            return Job.Status.KILLED;
+
+        }
+        // If all the coordinator actions are succeeded then coordinator job 
should be succeeded.
+        if (coordActionCount == 
(getStatusCount(CoordinatorAction.Status.SUCCEEDED)
+                + getStatusCount(CoordinatorAction.Status.SKIPPED)) && 
coordJob.isDoneMaterialization()) {
+            return Job.Status.SUCCEEDED;
+
+        }
+        else if (coordActionCount == 
getStatusCount(CoordinatorAction.Status.KILLED)) {
+            // If all the coordinator actions are KILLED then coordinator job 
should be KILLED.
+            return Job.Status.KILLED;
+
+        }
+        else if (coordActionCount == 
getStatusCount(CoordinatorAction.Status.FAILED)) {
+            // If all the coordinator actions are FAILED then coordinator job 
should be FAILED.
+            return Job.Status.FAILED;
+
+        }
+        else {
+            return Job.Status.DONEWITHERROR;
+        }
+    }
+
+    @Override
+    protected boolean isPausedState() {
+        return coordJob.getStatus().equals(Job.Status.PAUSED)
+                || coordJob.getStatus().equals(Job.Status.PAUSEDWITHERROR);
+    }
+
+    @Override
+    protected Status getPausedState() {
+        return hasTerminatedActions() ? Job.Status.PAUSEDWITHERROR : 
Job.Status.PAUSED;
+    }
+
+    @Override
+    protected boolean isSuspendedState() {
+        if (coordJob.getStatus() == Job.Status.SUSPENDED || 
coordJob.getStatus() == Job.Status.SUSPENDEDWITHERROR) {
+            return true;
+        }
+        else {
+            return getBottomUpSuspendedState() != null;
+        }
+    }
+
+    @Override
+    protected Status getSuspendedStatus() {
+        if (coordJob.getStatus() == Job.Status.SUSPENDED || 
coordJob.getStatus() == Job.Status.SUSPENDEDWITHERROR) {
+            return hasTerminatedActions() ? Job.Status.SUSPENDEDWITHERROR : 
Job.Status.SUSPENDED;
+        }
+        else {
+            return getBottomUpSuspendedState();
+        }
+    }
+
+    @Override
+    protected boolean isRunningState() {
+        return coordJob.getStatus() != Job.Status.PREP;
+    }
+
+    @Override
+    protected Status getRunningState() {
+        return hasTerminatedActions() ? Job.Status.RUNNINGWITHERROR : 
Job.Status.RUNNING;
+    }
+
+    @Override
+    protected void updateJobStatus(Status coordStatus) throws 
JPAExecutorException, CommandException {
+        final Job.Status prevStatus = coordJob.getStatus();
+
+        boolean prevPending = coordJob.isPending();
+        if (isPending) {
+            coordJob.setPending();
+        }
+        else {
+            coordJob.resetPending();
+        }
+        boolean isPendingStateChanged = prevPending != coordJob.isPending();
+
+        // Update the Coord Job
+        if (coordJob.isTerminalStatus()
+                && (coordStatus == Job.Status.SUSPENDED || coordStatus == 
Job.Status.SUSPENDEDWITHERROR)) {
+            LOG.info("Coord Job [" + coordJob.getId() + "] status to " + 
coordStatus
+                    + " can not be updated as its already in Terminal state");
+            if (isPendingStateChanged) {
+                LOG.info("Pending for job  [" + coordJob.getId() + "] is 
changed to to '" + coordJob.isPending()
+                        + "' from '" + prevStatus + "'");
+                coordJob.setLastModifiedTime(new Date());
+                CoordJobQueryExecutor.getInstance().executeUpdate(
+                        CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_MODTIME, 
coordJob);
+            }
+            return;
+
+        }
+
+        // Check for backward support when RUNNINGWITHERROR, 
SUSPENDEDWITHERROR and PAUSEDWITHERROR is
+        // not supported
+        
coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(coordStatus));
+        // Backward support when coordinator namespace is 0.1
+        coordJob.setStatus(StatusUtils.getStatus(coordJob));
+        if (coordJob.getStatus() != prevStatus || isPendingStateChanged) {
+            LOG.info("Set coordinator job [" + coordJob.getId() + "] status to 
'" + coordJob.getStatus() + "' from '"
+                    + prevStatus + "'");
+            coordJob.setLastModifiedTime(new Date());
+            
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_MODTIME,
+                    coordJob);
+        }
+        // update bundle action only when status changes in coord job
+        if (coordJob.getBundleId() != null) {
+            if (!prevStatus.equals(coordJob.getStatus())) {
+                new BundleStatusUpdateXCommand(coordJob, prevStatus).call();
+            }
+        }
+    }
+
+    /**
+     * Bottom up look for children to check the parent's status only if 
materialization is done and all actions are
+     * non-pending.
+     *
+     * @return the bottom up suspended state
+     */
+    protected Job.Status getBottomUpSuspendedState() {
+        if (coordJob.isDoneMaterialization() && !isPending
+                && 
coordActionStatus.containsKey(CoordinatorAction.Status.SUSPENDED)) {
+
+            if (coordActionCount == 
coordActionStatus.get(CoordinatorAction.Status.SUSPENDED)
+                    + getStatusCount(CoordinatorAction.Status.SUCCEEDED)) {
+                return Job.Status.SUSPENDED;
+
+            }
+            else if (coordActionCount == 
coordActionStatus.get(CoordinatorAction.Status.SUSPENDED)
+                    + getStatusCount(CoordinatorAction.Status.SUCCEEDED)
+                    + getStatusCount(CoordinatorAction.Status.KILLED) + 
getStatusCount(CoordinatorAction.Status.FAILED)
+                    + getStatusCount(CoordinatorAction.Status.TIMEDOUT)) {
+                return Job.Status.SUSPENDEDWITHERROR;
+
+            }
+        }
+        return null;
+    }
+
+    private boolean isCoordTerminalStatus(int coordActionsCount) {
+        return coordActionsCount == 
getStatusCount(CoordinatorAction.Status.SUCCEEDED)
+                + getStatusCount(CoordinatorAction.Status.FAILED) + 
getStatusCount(CoordinatorAction.Status.KILLED)
+                + getStatusCount(CoordinatorAction.Status.TIMEDOUT) + 
getStatusCount(CoordinatorAction.Status.SKIPPED);
+
+    }
+
+    private int getStatusCount(CoordinatorAction.Status status) {
+        int statusCount = 0;
+        if (coordActionStatus.containsKey(status)) {
+            statusCount = coordActionStatus.get(status);
+        }
+        return statusCount;
+    }
+
+    private boolean hasTerminatedActions() {
+        return coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
+                || 
coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
+                || 
coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/main/java/org/apache/oozie/executor/jpa/BundleActionQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/executor/jpa/BundleActionQueryExecutor.java
 
b/core/src/main/java/org/apache/oozie/executor/jpa/BundleActionQueryExecutor.java
index 4071891..704da87 100644
--- 
a/core/src/main/java/org/apache/oozie/executor/jpa/BundleActionQueryExecutor.java
+++ 
b/core/src/main/java/org/apache/oozie/executor/jpa/BundleActionQueryExecutor.java
@@ -43,7 +43,6 @@ public class BundleActionQueryExecutor extends
         UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME_COORDID,
         GET_BUNDLE_ACTION,
         GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE,
-        GET_BUNDLE_ACTIONS_BY_LAST_MODIFIED_TIME,
         GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN,
         GET_BUNDLE_UNIGNORED_ACTION_STATUS_PENDING_FOR_BUNDLE
     };
@@ -99,9 +98,6 @@ public class BundleActionQueryExecutor extends
             case GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE:
                 query.setParameter("bundleId", parameters[0]);
                 break;
-            case GET_BUNDLE_ACTIONS_BY_LAST_MODIFIED_TIME:
-                query.setParameter("lastModifiedTime", new 
Timestamp(((Date)parameters[0]).getTime()));
-                break;
             case GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN:
                 Timestamp ts = new Timestamp(System.currentTimeMillis() - 
(Long)parameters[0] * 1000);
                 query.setParameter("lastModifiedTime", ts);
@@ -146,10 +142,6 @@ public class BundleActionQueryExecutor extends
             case GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE:
                 bean = (BundleActionBean) ret;
                 break;
-            case GET_BUNDLE_ACTIONS_BY_LAST_MODIFIED_TIME:
-                bean = new BundleActionBean();
-                bean.setBundleId((String) ret);
-                break;
             case GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN:
                 bean = new BundleActionBean();
                 arr = (Object[]) ret;

http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java 
b/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java
index ce7473d..a770aad 100644
--- 
a/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java
+++ 
b/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java
@@ -20,6 +20,7 @@ package org.apache.oozie.executor.jpa;
 
 import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
 
 import javax.persistence.EntityManager;
@@ -48,7 +49,8 @@ public class BundleJobQueryExecutor extends 
QueryExecutor<BundleJobBean, BundleJ
         GET_BUNDLE_JOB,
         GET_BUNDLE_JOB_STATUS,
         GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME,
-        GET_BUNDLE_JOB_ID_JOBXML_CONF
+        GET_BUNDLE_JOB_ID_JOBXML_CONF,
+        GET_BUNDLE_IDS_FOR_STATUS_TRANSIT
     };
 
     private static BundleJobQueryExecutor instance = new 
BundleJobQueryExecutor();
@@ -134,6 +136,9 @@ public class BundleJobQueryExecutor extends 
QueryExecutor<BundleJobBean, BundleJ
             case GET_BUNDLE_JOB_STATUS:
                 query.setParameter("id", parameters[0]);
                 break;
+            case GET_BUNDLE_IDS_FOR_STATUS_TRANSIT:
+                query.setParameter("lastModifiedTime", 
DateUtils.convertDateToTimestamp((Date)parameters[0]));
+                break;
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor 
cannot set parameters for "
                         + namedQuery.name());
@@ -206,6 +211,12 @@ public class BundleJobQueryExecutor extends 
QueryExecutor<BundleJobBean, BundleJ
                 bean.setJobXmlBlob((StringBlob) arr[1]);
                 bean.setConfBlob((StringBlob) arr[2]);
                 break;
+
+            case GET_BUNDLE_IDS_FOR_STATUS_TRANSIT:
+                bean = new BundleJobBean();
+                bean.setId((String) ret);
+                break;
+
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor 
cannot construct job bean for "
                         + namedQuery.name());

http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
 
b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
index 0aee0e4..fc81a81 100644
--- 
a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
+++ 
b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
@@ -49,7 +49,9 @@ public class CoordActionQueryExecutor extends
         GET_COORD_ACTION,
         GET_COORD_ACTION_STATUS,
         GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID,
-        GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME
+        GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME,
+        GET_COORD_ACTIONS_STATUS_UNIGNORED,
+        GET_COORD_ACTIONS_PENDING_COUNT
     };
 
     private static CoordActionQueryExecutor instance = new 
CoordActionQueryExecutor();
@@ -170,6 +172,13 @@ public class CoordActionQueryExecutor extends
             case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME:
                 query.setParameter("lastModifiedTime", new Timestamp(((Date) 
parameters[0]).getTime()));
                 break;
+            case GET_COORD_ACTIONS_STATUS_UNIGNORED:
+                query.setParameter("jobId", parameters[0]);
+                break;
+            case GET_COORD_ACTIONS_PENDING_COUNT:
+                query.setParameter("jobId", parameters[0]);
+                break;
+
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor 
cannot set parameters for "
                         + caQuery.name());
@@ -230,6 +239,13 @@ public class CoordActionQueryExecutor extends
                 bean = new CoordinatorActionBean();
                 bean.setStatusStr((String)ret);
                 break;
+            case GET_COORD_ACTIONS_STATUS_UNIGNORED:
+                arr = (Object[]) ret;
+                bean = new CoordinatorActionBean();
+                bean.setStatusStr((String)arr[0]);
+                bean.setPending((Integer)arr[1]);
+                break;
+
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor 
cannot construct action bean for "
                         + namedQuery.name());
@@ -239,6 +255,13 @@ public class CoordActionQueryExecutor extends
 
     @Override
     public Object getSingleValue(CoordActionQuery namedQuery, Object... 
parameters) throws JPAExecutorException {
-        throw new UnsupportedOperationException();
+        JPAService jpaService = Services.get().get(JPAService.class);
+        EntityManager em = jpaService.getEntityManager();
+        Query query = getSelectQuery(namedQuery, em, parameters);
+        Object ret = jpaService.executeGet(namedQuery.name(), query, em);
+        if (ret == null) {
+            throw new JPAExecutorException(ErrorCode.E0604, query.toString());
+        }
+        return ret;
     }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetPendingActionsCountJPAExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetPendingActionsCountJPAExecutor.java
 
b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetPendingActionsCountJPAExecutor.java
deleted file mode 100644
index c96e5d9..0000000
--- 
a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetPendingActionsCountJPAExecutor.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.executor.jpa;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.util.ParamChecker;
-
-/**
- * Get the count of pending coordinator actions of a coordinator job
- */
-public class CoordJobGetPendingActionsCountJPAExecutor implements 
JPAExecutor<Integer> {
-
-    private String coordJobId = null;
-
-    public CoordJobGetPendingActionsCountJPAExecutor(String coordJobId) {
-        ParamChecker.notNull(coordJobId, "coordJobId");
-        this.coordJobId = coordJobId;
-    }
-
-    @Override
-    public String getName() {
-        return "CoordJobGetPendingActionsCountJPAExecutor";
-    }
-
-    @Override
-    public Integer execute(EntityManager em) throws JPAExecutorException {
-        try {
-            Query q = em.createNamedQuery("GET_COORD_ACTIONS_PENDING_COUNT");
-            q.setParameter("jobId", coordJobId);
-            Long count = (Long) q.getSingleResult();
-            return Integer.valueOf(count.intValue());
-        }
-        catch (Exception e) {
-            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java 
b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
index 169823b..4bccef4 100644
--- 
a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
+++ 
b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
@@ -64,7 +64,8 @@ public class CoordJobQueryExecutor extends 
QueryExecutor<CoordinatorJobBean, Coo
         GET_COORD_JOB_STATUS_PARENTID,
         GET_COORD_JOBS_CHANGED,
         GET_COORD_JOBS_OLDER_FOR_MATERILZATION,
-        GET_COORD_FOR_ABANDONEDCHECK
+        GET_COORD_FOR_ABANDONEDCHECK,
+        GET_COORD_IDS_FOR_STATUS_TRANSIT
     };
 
     private static CoordJobQueryExecutor instance = new 
CoordJobQueryExecutor();
@@ -214,6 +215,10 @@ public class CoordJobQueryExecutor extends 
QueryExecutor<CoordinatorJobBean, Coo
                 query.setParameter(2, (Timestamp) parameters[1]);
                 break;
 
+            case GET_COORD_IDS_FOR_STATUS_TRANSIT:
+                query.setParameter("lastModifiedTime", new Timestamp(((Date) 
parameters[0]).getTime()));
+                break;
+
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor 
cannot set parameters for "
                         + namedQuery.name());
@@ -343,6 +348,11 @@ public class CoordJobQueryExecutor extends 
QueryExecutor<CoordinatorJobBean, Coo
                 bean.setAppName((String) arr[3]);
                 break;
 
+            case GET_COORD_IDS_FOR_STATUS_TRANSIT:
+                bean = new CoordinatorJobBean();
+                bean.setId((String) ret);
+                break;
+
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor 
cannot construct job bean for "
                         + namedQuery.name());
@@ -382,5 +392,4 @@ public class CoordJobQueryExecutor extends 
QueryExecutor<CoordinatorJobBean, Coo
     public Object getSingleValue(CoordJobQuery namedQuery, Object... 
parameters) throws JPAExecutorException {
         throw new UnsupportedOperationException();
     }
-
 }

Reply via email to