OOZIE-1940 StatusTransitService has race condition
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/b87686b7 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/b87686b7 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/b87686b7 Branch: refs/heads/master Commit: b87686b74ad2ab888b3a3b3b2521da66fe35a0c4 Parents: 1292c7f Author: Purshotam Shah <[email protected]> Authored: Tue Sep 30 10:17:00 2014 -0700 Committer: Purshotam Shah <[email protected]> Committed: Tue Sep 30 10:17:00 2014 -0700 ---------------------------------------------------------------------- .../java/org/apache/oozie/BundleActionBean.java | 2 - .../java/org/apache/oozie/BundleJobBean.java | 3 + .../org/apache/oozie/CoordinatorActionBean.java | 2 +- .../org/apache/oozie/CoordinatorJobBean.java | 5 +- .../main/java/org/apache/oozie/ErrorCode.java | 3 + .../oozie/command/StatusTransitXCommand.java | 160 +++++ .../bundle/BundleStatusTransitXCommand.java | 325 +++++++++ .../coord/CoordStatusTransitXCommand.java | 297 ++++++++ .../executor/jpa/BundleActionQueryExecutor.java | 8 - .../executor/jpa/BundleJobQueryExecutor.java | 13 +- .../executor/jpa/CoordActionQueryExecutor.java | 27 +- ...ordJobGetPendingActionsCountJPAExecutor.java | 57 -- .../executor/jpa/CoordJobQueryExecutor.java | 13 +- .../oozie/service/StatusTransitService.java | 695 ++----------------- .../jpa/TestBundleActionQueryExecutor.java | 23 +- .../jpa/TestBundleJobQueryExecutor.java | 25 +- ...ordJobGetPendingActionsCountJPAExecutor.java | 68 -- .../oozie/service/TestStatusTransitService.java | 155 ++++- release-log.txt | 1 + 19 files changed, 1076 insertions(+), 806 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/main/java/org/apache/oozie/BundleActionBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/BundleActionBean.java b/core/src/main/java/org/apache/oozie/BundleActionBean.java index 963497d..65bfe8c 100644 --- a/core/src/main/java/org/apache/oozie/BundleActionBean.java +++ b/core/src/main/java/org/apache/oozie/BundleActionBean.java @@ -57,8 +57,6 @@ import org.json.simple.JSONObject; @NamedQuery(name = "GET_BUNDLE_ACTIONS", query = "select OBJECT(w) from BundleActionBean w"), - @NamedQuery(name = "GET_BUNDLE_ACTIONS_BY_LAST_MODIFIED_TIME", query = "select w.bundleId from BundleActionBean w where w.lastModifiedTimestamp >= :lastModifiedTime"), - @NamedQuery(name = "GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN", query = "select w.bundleActionId, w.bundleId, w.statusStr, w.coordId, w.coordName from BundleActionBean w where w.pending > 0 AND w.lastModifiedTimestamp <= :lastModifiedTime"), @NamedQuery(name = "GET_BUNDLE_ACTION", query = "select OBJECT(w) from BundleActionBean w where w.bundleActionId = :bundleActionId"), http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/main/java/org/apache/oozie/BundleJobBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/BundleJobBean.java b/core/src/main/java/org/apache/oozie/BundleJobBean.java index 76e76b7..4dbffc3 100644 --- a/core/src/main/java/org/apache/oozie/BundleJobBean.java +++ b/core/src/main/java/org/apache/oozie/BundleJobBean.java @@ -107,6 +107,9 @@ import org.json.simple.JSONObject; @NamedQuery(name = "BULK_MONITOR_COUNT_QUERY", query = "SELECT COUNT(a) FROM CoordinatorActionBean a, CoordinatorJobBean c"), + @NamedQuery(name = "GET_BUNDLE_IDS_FOR_STATUS_TRANSIT", query = "select DISTINCT w.id from BundleActionBean a , BundleJobBean w where a.lastModifiedTimestamp >= :lastModifiedTime and w.id = a.bundleId and (w.statusStr = 'RUNNING' OR w.statusStr = 'RUNNINGWITHERROR' OR w.statusStr = 'PAUSED' OR w.statusStr = 'PAUSEDWITHERROR' OR w.pending = 1)"), + + @NamedQuery(name = "GET_BUNDLE_JOB_FOR_USER", query = "select w.user from BundleJobBean w where w.id = :id") }) @Table(name = "BUNDLE_JOBS") public class BundleJobBean implements Writable, BundleJob, JsonBean { http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java index cc5596b..c5a6ca8 100644 --- a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java +++ b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java @@ -137,7 +137,7 @@ import org.json.simple.JSONObject; @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending > 0"), // Query to retrieve status of Coordinator actions - @NamedQuery(name = "GET_COORD_ACTIONS_STATUS_UNIGNORED", query = "select a.statusStr from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr <> 'IGNORED'"), + @NamedQuery(name = "GET_COORD_ACTIONS_STATUS_UNIGNORED", query = "select a.statusStr, a.pending from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr <> 'IGNORED'"), // Query to retrieve status of Coordinator actions @NamedQuery(name = "GET_COORD_ACTION_STATUS", query = "select a.statusStr from CoordinatorActionBean a where a.id = :id"), http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java index 03757dd..8f11645 100644 --- a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java +++ b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java @@ -133,8 +133,11 @@ import org.json.simple.JSONObject; @NamedQuery(name = "GET_COORD_JOB_STATUS", query = "select w.statusStr from CoordinatorJobBean w where w.id = :id"), - @NamedQuery(name = "GET_COORD_JOB_STATUS_PARENTID", query = "select w.statusStr, w.bundleId from CoordinatorJobBean w where w.id = :id")}) + @NamedQuery(name = "GET_COORD_JOB_STATUS_PARENTID", query = "select w.statusStr, w.bundleId from CoordinatorJobBean w where w.id = :id"), + @NamedQuery(name = "GET_COORD_IDS_FOR_STATUS_TRANSIT", query = "select DISTINCT w.id from CoordinatorActionBean a, CoordinatorJobBean w where w.id = a.jobId and a.lastModifiedTimestamp >= :lastModifiedTime and (w.statusStr IN ('PAUSED', 'RUNNING', 'RUNNINGWITHERROR', 'PAUSEDWITHERROR') or w.pending = 1) and w.statusStr <> 'IGNORED'") + +}) @NamedNativeQueries({ @NamedNativeQuery(name = "GET_COORD_FOR_ABANDONEDCHECK", query = "select w.id, w.USER_NAME, w.group_name, w.APP_NAME from coord_jobs w where ( w.STATUS = 'RUNNING' or w.STATUS = 'RUNNINGWITHERROR' ) and w.start_time < ?2 and w.id in (select failedJobs.job_id from (select a.job_id from coord_actions a where ( a.STATUS = 'FAILED' or a.STATUS = 'TIMEDOUT' or a.STATUS = 'SUSPENDED') group by a.job_id having count(*) >= ?1 ) failedJobs LEFT OUTER JOIN (select b.job_id from coord_actions b where b.STATUS = 'SUCCEEDED' group by b.job_id having count(*) > 0 ) successJobs on failedJobs.job_id = successJobs.job_id where successJobs.job_id is null )") http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/main/java/org/apache/oozie/ErrorCode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/ErrorCode.java b/core/src/main/java/org/apache/oozie/ErrorCode.java index f9f88f4..4afeb6c 100644 --- a/core/src/main/java/org/apache/oozie/ErrorCode.java +++ b/core/src/main/java/org/apache/oozie/ErrorCode.java @@ -207,6 +207,8 @@ public enum ErrorCode { E1022(XLog.STD, "Cannot delete running/completed coordinator action: [{0}]"), E1023(XLog.STD, "Coord Job update Error: [{0}]"), E1024(XLog.STD, "Cannot run ignore command: [{0}]"), + E1025(XLog.STD, "Coord status transit error: [{0}]"), + E1100(XLog.STD, "Command precondition does not hold before execution, [{0}]"), @@ -235,6 +237,7 @@ public enum ErrorCode { E1319(XLog.STD, "Invalid bundle coord job namespace, [{0}]"), E1320(XLog.STD, "Bundle Job change error, [{0}]"), E1321(XLog.STD, "Error evaluating coord name, [{0}]"), + E1322(XLog.STD, "Bundle status transit error: [{0}]"), E1400(XLog.STD, "doAs (proxyuser) failure"), http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/main/java/org/apache/oozie/command/StatusTransitXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/StatusTransitXCommand.java b/core/src/main/java/org/apache/oozie/command/StatusTransitXCommand.java new file mode 100644 index 0000000..3e057b2 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/StatusTransitXCommand.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command; + +import org.apache.oozie.client.Job; +import org.apache.oozie.executor.jpa.JPAExecutorException; + +/** + * StatusTransitXCommand is super class for Status Transit Command, it defines layout for Status Transit Commands. It + * tries change job status change after acquiring lock with zero timeout. StatusTransit Commands are not requeued. + */ +abstract public class StatusTransitXCommand extends XCommand<Void> { + + /** + * Instantiates a new status transit x command. + * + * @param name the name + * @param type the type + * @param priority the priority + */ + public StatusTransitXCommand(String name, String type, int priority) { + super(name, type, priority); + } + + @Override + final protected long getLockTimeOut() { + return 0L; + } + + @Override + final protected boolean isReQueueRequired() { + return false; + } + + @Override + final protected boolean isLockRequired() { + return true; + } + + @Override + protected Void execute() throws CommandException { + + final Job.Status jobStatus = getJobStatus(); + try { + if (jobStatus != null) { + updateJobStatus(jobStatus); + } + } + catch (JPAExecutorException e) { + throw new CommandException(e); + } + + return null; + + } + + /** + * Gets the job status. + * + * @return the job status + * @throws CommandException the command exception + */ + protected Job.Status getJobStatus() throws CommandException { + if (isTerminalState()) { + return getTerminalStatus(); + } + if (isPausedState()) { + return getPausedState(); + } + if (isSuspendedState()) { + return getSuspendedStatus(); + } + if (isRunningState()) { + return getRunningState(); + } + return null; + } + + /** + * Checks if job is in terminal state. + * + * @return true, if is terminal state + */ + protected abstract boolean isTerminalState(); + + /** + * Gets the job terminal status. + * + * @return the terminal status + */ + protected abstract Job.Status getTerminalStatus(); + + /** + * Checks if job is in paused state. + * + * @return true, if job is in paused state + */ + protected abstract boolean isPausedState(); + + /** + * Gets the job pause state. + * + * @return the paused state + */ + protected abstract Job.Status getPausedState(); + + /** + * Checks if is in suspended state. + * + * @return true, if job is in suspended state + */ + protected abstract boolean isSuspendedState(); + + /** + * Gets the suspended status. + * + * @return the suspended status + */ + protected abstract Job.Status getSuspendedStatus(); + + /** + * Checks if job is in running state. + * + * @return true, if job is in running state + */ + protected abstract boolean isRunningState(); + + /** + * Gets the job running state. + * + * @return the running state + */ + protected abstract Job.Status getRunningState(); + + /** + * Update job status. + * + * @param status the status + * @throws JPAExecutorException the JPA executor exception + * @throws CommandException the command exception + */ + protected abstract void updateJobStatus(Job.Status status) throws JPAExecutorException, CommandException; + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java b/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java new file mode 100644 index 0000000..d6a3197 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java @@ -0,0 +1,325 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command.bundle; + +import java.util.Date; +import java.util.HashMap; +import java.util.List; + +import org.apache.oozie.BundleActionBean; +import org.apache.oozie.BundleJobBean; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.client.Job; +import org.apache.oozie.client.Job.Status; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.PreconditionException; +import org.apache.oozie.command.StatusTransitXCommand; +import org.apache.oozie.executor.jpa.BundleActionQueryExecutor; +import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; +import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery; +import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery; +import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.util.LogUtils; +import org.apache.oozie.util.StatusUtils; + +/** + * BundleStatusTransitXCommand update job's status according to its child actions' status. If all child actions' pending + * flag equals 0 (job done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's + * status to SUCCEEDED. + */ +public class BundleStatusTransitXCommand extends StatusTransitXCommand { + + private String jobId; + private List<BundleActionBean> bundleActions; + private BundleJobBean bundleJob; + private boolean foundPending; + private HashMap<Job.Status, Integer> bundleActionStatus = new HashMap<Job.Status, Integer>(); + + public BundleStatusTransitXCommand(String id) { + super("bundle_status_transit", "bundle_status_transit", 0); + this.jobId = id; + } + + @Override + public String getEntityKey() { + return jobId; + } + + @Override + protected void loadState() throws CommandException { + try { + bundleJob = BundleJobQueryExecutor.getInstance().get( + BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME, jobId); + + bundleActions = BundleActionQueryExecutor.getInstance().getList( + BundleActionQuery.GET_BUNDLE_UNIGNORED_ACTION_STATUS_PENDING_FOR_BUNDLE, jobId); + for (BundleActionBean bAction : bundleActions) { + int counter = 0; + if (bundleActionStatus.containsKey(bAction.getStatus())) { + counter = bundleActionStatus.get(bAction.getStatus()) + 1; + } + else { + ++counter; + } + bundleActionStatus.put(bAction.getStatus(), counter); + if (bAction.getCoordId() == null + && (bAction.getStatus() == Job.Status.FAILED || bAction.getStatus() == Job.Status.KILLED) ) { + new BundleKillXCommand(jobId).call(); + bundleJob = BundleJobQueryExecutor.getInstance().get( + BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME, jobId); + bundleJob.setStatus(Job.Status.FAILED); + bundleJob.setLastModifiedTime(new Date()); + BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS, + bundleJob); + } + + if (bAction.isPending()) { + foundPending = true; + } + } + LogUtils.setLogInfo(bundleJob); + } + catch (JPAExecutorException e) { + throw new CommandException(ErrorCode.E1322, e); + } + } + + @Override + protected Job.Status getJobStatus() throws CommandException { + Job.Status jobStatus = super.getJobStatus(); + if (jobStatus == null) { + if (isPrepRunningState()) { + return getPrepRunningStatus(); + } + } + + return jobStatus; + } + + @Override + protected boolean isTerminalState() { + return !foundPending + && bundleActions.size() == getActionStatusCount(Job.Status.SUCCEEDED) + + getActionStatusCount(Job.Status.FAILED) + getActionStatusCount(Job.Status.KILLED) + + getActionStatusCount(Job.Status.DONEWITHERROR); + } + + @Override + protected Job.Status getTerminalStatus() { + + // If all bundle action is done and bundle is killed, then don't change the status. + if (bundleJob.getStatus().equals(Job.Status.KILLED)) { + return Job.Status.KILLED; + + } + // If all the bundle actions are succeeded then bundle job should be succeeded. + if (bundleActions.size() == getActionStatusCount(Job.Status.SUCCEEDED)) { + return Job.Status.SUCCEEDED; + + } + else if (bundleActions.size() == getActionStatusCount(Job.Status.KILLED)) { + // If all the bundle actions are KILLED then bundle job should be KILLED. + return Job.Status.KILLED; + } + else if (bundleActions.size() == getActionStatusCount(Job.Status.FAILED)) { + // If all the bundle actions are FAILED then bundle job should be FAILED. + return Job.Status.FAILED; + } + else { + return Job.Status.DONEWITHERROR; + + } + } + + @Override + protected boolean isPausedState() { + + if (bundleJob.getStatus() == Job.Status.PAUSED || bundleJob.getStatus() == Job.Status.PAUSEDWITHERROR) { + return true; + } + else { + return getBottomUpPauseStatus() != null; + } + } + + @Override + protected Job.Status getPausedState() { + if (bundleJob.getStatus() == Job.Status.PAUSED || bundleJob.getStatus() == Job.Status.PAUSEDWITHERROR) { + if (hasTerminatedActions() || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR) + || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR) + || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) { + return Job.Status.PAUSEDWITHERROR; + } + else { + return Job.Status.PAUSED; + } + } + return getBottomUpPauseStatus(); + + } + + @Override + protected boolean isSuspendedState() { + if (bundleJob.getStatus() == Job.Status.SUSPENDED || bundleJob.getStatus() == Job.Status.SUSPENDEDWITHERROR) { + return true; + } + + return getBottomUpSuspendedState() != null; + + } + + @Override + protected Job.Status getSuspendedStatus() { + if (bundleJob.getStatus() == Job.Status.SUSPENDED || bundleJob.getStatus() == Job.Status.SUSPENDEDWITHERROR) { + if (hasTerminatedActions() || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR) + || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) { + return Job.Status.SUSPENDEDWITHERROR; + } + else { + return Job.Status.SUSPENDED; + } + + } + return getBottomUpSuspendedState(); + + } + + @Override + protected boolean isRunningState() { + return true; + } + + @Override + protected Status getRunningState() { + if (bundleJob.getStatus() != Job.Status.PREP) { + return getRunningStatus(bundleActionStatus); + } + else + return null; + } + + @Override + protected void updateJobStatus(Job.Status bundleStatus) throws JPAExecutorException { + LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus + "' from '" + bundleJob.getStatus() + "'"); + + String jobId = bundleJob.getId(); + // Update the Bundle Job + // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and + // PAUSEDWITHERROR is not supported + bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(bundleStatus)); + if (foundPending) { + bundleJob.setPending(); + LOG.info("Bundle job [" + jobId + "] Pending set to TRUE"); + } + else { + bundleJob.resetPending(); + LOG.info("Bundle job [" + jobId + "] Pending set to FALSE"); + } + BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING_MODTIME, + bundleJob); + } + + /** + * bottom up; check the status of parent through their children. + * + * @return the bottom up pause status + */ + private Job.Status getBottomUpPauseStatus() { + + if (bundleActionStatus.containsKey(Job.Status.PAUSED) + && bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED)) { + return Job.Status.PAUSED; + + } + else if (bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR) + && bundleActions.size() == getActionStatusCount(Job.Status.PAUSED) + + getActionStatusCount(Job.Status.PAUSEDWITHERROR)) { + return Job.Status.PAUSEDWITHERROR; + } + + return null; + } + + /** + * Bottom up update status of parent from the status of its children. + * + * @return the bottom up suspended state + */ + private Job.Status getBottomUpSuspendedState() { + + if (!foundPending && bundleActionStatus.containsKey(Job.Status.SUSPENDED) + || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)) { + + if (bundleActions.size() == getActionStatusCount(Job.Status.SUSPENDED) + + getActionStatusCount(Job.Status.SUCCEEDED)) { + return Job.Status.SUSPENDED; + } + else if (bundleActions.size() == getActionStatusCount(Job.Status.SUSPENDEDWITHERROR) + + bundleActionStatus.get(Job.Status.SUSPENDED) + getActionStatusCount(Job.Status.SUCCEEDED) + + getActionStatusCount(Job.Status.KILLED) + getActionStatusCount(Job.Status.FAILED) + + getActionStatusCount(Job.Status.DONEWITHERROR)) { + return Job.Status.SUSPENDEDWITHERROR; + + } + } + return null; + } + + private boolean hasTerminatedActions() { + return bundleActionStatus.containsKey(Job.Status.KILLED) || bundleActionStatus.containsKey(Job.Status.FAILED) + || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR); + + } + + private boolean isPrepRunningState() { + return !foundPending && bundleActionStatus.containsKey(Job.Status.PREP) + && bundleActions.size() > bundleActionStatus.get(Job.Status.PREP); + } + + private Status getPrepRunningStatus() { + return getRunningStatus(bundleActionStatus); + + } + + private int getActionStatusCount(final Job.Status status) { + + if (bundleActionStatus.containsKey(status)) { + return bundleActionStatus.get(status); + } + else { + return 0; + } + } + + private Job.Status getRunningStatus(HashMap<Job.Status, Integer> actionStatus) { + if (actionStatus.containsKey(Job.Status.FAILED) || actionStatus.containsKey(Job.Status.KILLED) + || actionStatus.containsKey(Job.Status.DONEWITHERROR) + || actionStatus.containsKey(Job.Status.RUNNINGWITHERROR)) { + return Job.Status.RUNNINGWITHERROR; + } + else { + return Job.Status.RUNNING; + } + } + + @Override + protected void verifyPrecondition() throws CommandException, PreconditionException { + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/main/java/org/apache/oozie/command/coord/CoordStatusTransitXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordStatusTransitXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordStatusTransitXCommand.java new file mode 100644 index 0000000..2c5aab8 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordStatusTransitXCommand.java @@ -0,0 +1,297 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command.coord; + +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.oozie.CoordinatorActionBean; +import org.apache.oozie.CoordinatorJobBean; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.client.CoordinatorAction; +import org.apache.oozie.client.Job; +import org.apache.oozie.client.Job.Status; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.PreconditionException; +import org.apache.oozie.command.StatusTransitXCommand; +import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; +import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; +import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; +import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.service.SchemaService; +import org.apache.oozie.service.Services; +import org.apache.oozie.service.StatusTransitService; +import org.apache.oozie.util.LogUtils; +import org.apache.oozie.util.StatusUtils; + +/** + * CoordStatusTransitXCommand update coord job's status according to its child actions' status. If all child actions' + * pending flag equals 0 (job done), we reset the job's pending flag to 0. If all child actions are succeeded, we set + * the job's status to SUCCEEDED. + */ +public class CoordStatusTransitXCommand extends StatusTransitXCommand { + + private final String jobId; + private CoordinatorJobBean coordJob; + int coordActionCount; + private final Map<CoordinatorAction.Status, Integer> coordActionStatus = new HashMap<CoordinatorAction.Status, Integer>(); + boolean isPending = false; + + final boolean backwardSupportForCoordStatus = Services.get().getConf() + .getBoolean(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false); + + public CoordStatusTransitXCommand(String jobId) { + super("coord_status_transit", "coord_status_transit", 0); + this.jobId = jobId; + } + + @Override + public String getEntityKey() { + return jobId; + } + + @Override + protected void loadState() throws CommandException { + try { + coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, jobId); + List<CoordinatorActionBean> coordActionStatusList = CoordActionQueryExecutor.getInstance().getList( + CoordActionQuery.GET_COORD_ACTIONS_STATUS_UNIGNORED, jobId); + + long count = (Long) CoordActionQueryExecutor.getInstance().getSingleValue( + CoordActionQuery.GET_COORD_ACTIONS_PENDING_COUNT, jobId); + if (count > 0) { + isPending = true; + } + + for (CoordinatorAction coordAction : coordActionStatusList) { + int counter = 0; + if (coordActionStatus.containsKey(coordAction.getStatus())) { + counter = coordActionStatus.get(coordAction.getStatus()) + 1; + } + else { + ++counter; + } + coordActionStatus.put(coordAction.getStatus(), counter); + } + coordActionCount = coordActionStatusList.size(); + } + catch (JPAExecutorException jpae) { + throw new CommandException(ErrorCode.E1025, jpae); + } + LogUtils.setLogInfo(this.coordJob); + } + + @Override + protected void verifyPrecondition() throws CommandException, PreconditionException { + // if namespace 0.1 is used and backward support is true, then ignore this coord job + if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null + && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) { + throw new CommandException(ErrorCode.E1025, + " Coord namespace is 0.1 and backward.support.for.coord.status is set"); + } + + } + + @Override + protected Job.Status getJobStatus() throws CommandException { + Job.Status jobStatus = super.getJobStatus(); + if (jobStatus == null) { + jobStatus = coordJob.getStatus(); + } + + return jobStatus; + } + + @Override + protected boolean isTerminalState() { + return (coordJob.isDoneMaterialization() || coordJob.getStatus() == Job.Status.FAILED || + coordJob.getStatus() == Job.Status.KILLED) && isCoordTerminalStatus(coordActionCount); + } + + @Override + protected Status getTerminalStatus() { + + // If all coord action is done and coord is killed, then don't change the status. + if (coordJob.getStatus().equals(Job.Status.KILLED)) { + return Job.Status.KILLED; + + } + // If all the coordinator actions are succeeded then coordinator job should be succeeded. + if (coordActionCount == (getStatusCount(CoordinatorAction.Status.SUCCEEDED) + + getStatusCount(CoordinatorAction.Status.SKIPPED)) && coordJob.isDoneMaterialization()) { + return Job.Status.SUCCEEDED; + + } + else if (coordActionCount == getStatusCount(CoordinatorAction.Status.KILLED)) { + // If all the coordinator actions are KILLED then coordinator job should be KILLED. + return Job.Status.KILLED; + + } + else if (coordActionCount == getStatusCount(CoordinatorAction.Status.FAILED)) { + // If all the coordinator actions are FAILED then coordinator job should be FAILED. + return Job.Status.FAILED; + + } + else { + return Job.Status.DONEWITHERROR; + } + } + + @Override + protected boolean isPausedState() { + return coordJob.getStatus().equals(Job.Status.PAUSED) + || coordJob.getStatus().equals(Job.Status.PAUSEDWITHERROR); + } + + @Override + protected Status getPausedState() { + return hasTerminatedActions() ? Job.Status.PAUSEDWITHERROR : Job.Status.PAUSED; + } + + @Override + protected boolean isSuspendedState() { + if (coordJob.getStatus() == Job.Status.SUSPENDED || coordJob.getStatus() == Job.Status.SUSPENDEDWITHERROR) { + return true; + } + else { + return getBottomUpSuspendedState() != null; + } + } + + @Override + protected Status getSuspendedStatus() { + if (coordJob.getStatus() == Job.Status.SUSPENDED || coordJob.getStatus() == Job.Status.SUSPENDEDWITHERROR) { + return hasTerminatedActions() ? Job.Status.SUSPENDEDWITHERROR : Job.Status.SUSPENDED; + } + else { + return getBottomUpSuspendedState(); + } + } + + @Override + protected boolean isRunningState() { + return coordJob.getStatus() != Job.Status.PREP; + } + + @Override + protected Status getRunningState() { + return hasTerminatedActions() ? Job.Status.RUNNINGWITHERROR : Job.Status.RUNNING; + } + + @Override + protected void updateJobStatus(Status coordStatus) throws JPAExecutorException, CommandException { + final Job.Status prevStatus = coordJob.getStatus(); + + boolean prevPending = coordJob.isPending(); + if (isPending) { + coordJob.setPending(); + } + else { + coordJob.resetPending(); + } + boolean isPendingStateChanged = prevPending != coordJob.isPending(); + + // Update the Coord Job + if (coordJob.isTerminalStatus() + && (coordStatus == Job.Status.SUSPENDED || coordStatus == Job.Status.SUSPENDEDWITHERROR)) { + LOG.info("Coord Job [" + coordJob.getId() + "] status to " + coordStatus + + " can not be updated as its already in Terminal state"); + if (isPendingStateChanged) { + LOG.info("Pending for job [" + coordJob.getId() + "] is changed to to '" + coordJob.isPending() + + "' from '" + prevStatus + "'"); + coordJob.setLastModifiedTime(new Date()); + CoordJobQueryExecutor.getInstance().executeUpdate( + CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_MODTIME, coordJob); + } + return; + + } + + // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and PAUSEDWITHERROR is + // not supported + coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(coordStatus)); + // Backward support when coordinator namespace is 0.1 + coordJob.setStatus(StatusUtils.getStatus(coordJob)); + if (coordJob.getStatus() != prevStatus || isPendingStateChanged) { + LOG.info("Set coordinator job [" + coordJob.getId() + "] status to '" + coordJob.getStatus() + "' from '" + + prevStatus + "'"); + coordJob.setLastModifiedTime(new Date()); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_MODTIME, + coordJob); + } + // update bundle action only when status changes in coord job + if (coordJob.getBundleId() != null) { + if (!prevStatus.equals(coordJob.getStatus())) { + new BundleStatusUpdateXCommand(coordJob, prevStatus).call(); + } + } + } + + /** + * Bottom up look for children to check the parent's status only if materialization is done and all actions are + * non-pending. + * + * @return the bottom up suspended state + */ + protected Job.Status getBottomUpSuspendedState() { + if (coordJob.isDoneMaterialization() && !isPending + && coordActionStatus.containsKey(CoordinatorAction.Status.SUSPENDED)) { + + if (coordActionCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED) + + getStatusCount(CoordinatorAction.Status.SUCCEEDED)) { + return Job.Status.SUSPENDED; + + } + else if (coordActionCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED) + + getStatusCount(CoordinatorAction.Status.SUCCEEDED) + + getStatusCount(CoordinatorAction.Status.KILLED) + getStatusCount(CoordinatorAction.Status.FAILED) + + getStatusCount(CoordinatorAction.Status.TIMEDOUT)) { + return Job.Status.SUSPENDEDWITHERROR; + + } + } + return null; + } + + private boolean isCoordTerminalStatus(int coordActionsCount) { + return coordActionsCount == getStatusCount(CoordinatorAction.Status.SUCCEEDED) + + getStatusCount(CoordinatorAction.Status.FAILED) + getStatusCount(CoordinatorAction.Status.KILLED) + + getStatusCount(CoordinatorAction.Status.TIMEDOUT) + getStatusCount(CoordinatorAction.Status.SKIPPED); + + } + + private int getStatusCount(CoordinatorAction.Status status) { + int statusCount = 0; + if (coordActionStatus.containsKey(status)) { + statusCount = coordActionStatus.get(status); + } + return statusCount; + } + + private boolean hasTerminatedActions() { + return coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) + || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) + || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT); + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/main/java/org/apache/oozie/executor/jpa/BundleActionQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/BundleActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/BundleActionQueryExecutor.java index 4071891..704da87 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/BundleActionQueryExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/BundleActionQueryExecutor.java @@ -43,7 +43,6 @@ public class BundleActionQueryExecutor extends UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME_COORDID, GET_BUNDLE_ACTION, GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE, - GET_BUNDLE_ACTIONS_BY_LAST_MODIFIED_TIME, GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN, GET_BUNDLE_UNIGNORED_ACTION_STATUS_PENDING_FOR_BUNDLE }; @@ -99,9 +98,6 @@ public class BundleActionQueryExecutor extends case GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE: query.setParameter("bundleId", parameters[0]); break; - case GET_BUNDLE_ACTIONS_BY_LAST_MODIFIED_TIME: - query.setParameter("lastModifiedTime", new Timestamp(((Date)parameters[0]).getTime())); - break; case GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN: Timestamp ts = new Timestamp(System.currentTimeMillis() - (Long)parameters[0] * 1000); query.setParameter("lastModifiedTime", ts); @@ -146,10 +142,6 @@ public class BundleActionQueryExecutor extends case GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE: bean = (BundleActionBean) ret; break; - case GET_BUNDLE_ACTIONS_BY_LAST_MODIFIED_TIME: - bean = new BundleActionBean(); - bean.setBundleId((String) ret); - break; case GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN: bean = new BundleActionBean(); arr = (Object[]) ret; http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java index ce7473d..a770aad 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java @@ -20,6 +20,7 @@ package org.apache.oozie.executor.jpa; import java.sql.Timestamp; import java.util.ArrayList; +import java.util.Date; import java.util.List; import javax.persistence.EntityManager; @@ -48,7 +49,8 @@ public class BundleJobQueryExecutor extends QueryExecutor<BundleJobBean, BundleJ GET_BUNDLE_JOB, GET_BUNDLE_JOB_STATUS, GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME, - GET_BUNDLE_JOB_ID_JOBXML_CONF + GET_BUNDLE_JOB_ID_JOBXML_CONF, + GET_BUNDLE_IDS_FOR_STATUS_TRANSIT }; private static BundleJobQueryExecutor instance = new BundleJobQueryExecutor(); @@ -134,6 +136,9 @@ public class BundleJobQueryExecutor extends QueryExecutor<BundleJobBean, BundleJ case GET_BUNDLE_JOB_STATUS: query.setParameter("id", parameters[0]); break; + case GET_BUNDLE_IDS_FOR_STATUS_TRANSIT: + query.setParameter("lastModifiedTime", DateUtils.convertDateToTimestamp((Date)parameters[0])); + break; default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " + namedQuery.name()); @@ -206,6 +211,12 @@ public class BundleJobQueryExecutor extends QueryExecutor<BundleJobBean, BundleJ bean.setJobXmlBlob((StringBlob) arr[1]); bean.setConfBlob((StringBlob) arr[2]); break; + + case GET_BUNDLE_IDS_FOR_STATUS_TRANSIT: + bean = new BundleJobBean(); + bean.setId((String) ret); + break; + default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for " + namedQuery.name()); http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java index 0aee0e4..fc81a81 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java @@ -49,7 +49,9 @@ public class CoordActionQueryExecutor extends GET_COORD_ACTION, GET_COORD_ACTION_STATUS, GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID, - GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME + GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME, + GET_COORD_ACTIONS_STATUS_UNIGNORED, + GET_COORD_ACTIONS_PENDING_COUNT }; private static CoordActionQueryExecutor instance = new CoordActionQueryExecutor(); @@ -170,6 +172,13 @@ public class CoordActionQueryExecutor extends case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME: query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime())); break; + case GET_COORD_ACTIONS_STATUS_UNIGNORED: + query.setParameter("jobId", parameters[0]); + break; + case GET_COORD_ACTIONS_PENDING_COUNT: + query.setParameter("jobId", parameters[0]); + break; + default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " + caQuery.name()); @@ -230,6 +239,13 @@ public class CoordActionQueryExecutor extends bean = new CoordinatorActionBean(); bean.setStatusStr((String)ret); break; + case GET_COORD_ACTIONS_STATUS_UNIGNORED: + arr = (Object[]) ret; + bean = new CoordinatorActionBean(); + bean.setStatusStr((String)arr[0]); + bean.setPending((Integer)arr[1]); + break; + default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for " + namedQuery.name()); @@ -239,6 +255,13 @@ public class CoordActionQueryExecutor extends @Override public Object getSingleValue(CoordActionQuery namedQuery, Object... parameters) throws JPAExecutorException { - throw new UnsupportedOperationException(); + JPAService jpaService = Services.get().get(JPAService.class); + EntityManager em = jpaService.getEntityManager(); + Query query = getSelectQuery(namedQuery, em, parameters); + Object ret = jpaService.executeGet(namedQuery.name(), query, em); + if (ret == null) { + throw new JPAExecutorException(ErrorCode.E0604, query.toString()); + } + return ret; } } http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetPendingActionsCountJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetPendingActionsCountJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetPendingActionsCountJPAExecutor.java deleted file mode 100644 index c96e5d9..0000000 --- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetPendingActionsCountJPAExecutor.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.oozie.executor.jpa; - -import javax.persistence.EntityManager; -import javax.persistence.Query; - -import org.apache.oozie.ErrorCode; -import org.apache.oozie.util.ParamChecker; - -/** - * Get the count of pending coordinator actions of a coordinator job - */ -public class CoordJobGetPendingActionsCountJPAExecutor implements JPAExecutor<Integer> { - - private String coordJobId = null; - - public CoordJobGetPendingActionsCountJPAExecutor(String coordJobId) { - ParamChecker.notNull(coordJobId, "coordJobId"); - this.coordJobId = coordJobId; - } - - @Override - public String getName() { - return "CoordJobGetPendingActionsCountJPAExecutor"; - } - - @Override - public Integer execute(EntityManager em) throws JPAExecutorException { - try { - Query q = em.createNamedQuery("GET_COORD_ACTIONS_PENDING_COUNT"); - q.setParameter("jobId", coordJobId); - Long count = (Long) q.getSingleResult(); - return Integer.valueOf(count.intValue()); - } - catch (Exception e) { - throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e); - } - } - -} http://git-wip-us.apache.org/repos/asf/oozie/blob/b87686b7/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java index 169823b..4bccef4 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java @@ -64,7 +64,8 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo GET_COORD_JOB_STATUS_PARENTID, GET_COORD_JOBS_CHANGED, GET_COORD_JOBS_OLDER_FOR_MATERILZATION, - GET_COORD_FOR_ABANDONEDCHECK + GET_COORD_FOR_ABANDONEDCHECK, + GET_COORD_IDS_FOR_STATUS_TRANSIT }; private static CoordJobQueryExecutor instance = new CoordJobQueryExecutor(); @@ -214,6 +215,10 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo query.setParameter(2, (Timestamp) parameters[1]); break; + case GET_COORD_IDS_FOR_STATUS_TRANSIT: + query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime())); + break; + default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " + namedQuery.name()); @@ -343,6 +348,11 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo bean.setAppName((String) arr[3]); break; + case GET_COORD_IDS_FOR_STATUS_TRANSIT: + bean = new CoordinatorJobBean(); + bean.setId((String) ret); + break; + default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for " + namedQuery.name()); @@ -382,5 +392,4 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo public Object getSingleValue(CoordJobQuery namedQuery, Object... parameters) throws JPAExecutorException { throw new UnsupportedOperationException(); } - }
