[
https://issues.apache.org/jira/browse/OOZIE-1885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
abhishek bafna updated OOZIE-1885:
----------------------------------
Fix Version/s: (was: trunk)
4.3.0
> Query optimization for StatusTransitService
> -------------------------------------------
>
> Key: OOZIE-1885
> URL: https://issues.apache.org/jira/browse/OOZIE-1885
> Project: Oozie
> Issue Type: Bug
> Reporter: Purshotam Shah
> Assignee: Purshotam Shah
> Fix For: 4.3.0
>
>
> {code}
> private void coordTransit() throws JPAExecutorException, CommandException {
> List<CoordinatorJobBean> pendingJobCheckList = null;
> if (lastInstanceStartTime == null) {
> LOG.info("Running coordinator status service first instance");
> // this is the first instance, we need to check for all
> pending jobs;
> pendingJobCheckList = jpaService.execute(new
> CoordJobsGetPendingJPAExecutor(limit));
> }
> else {
> LOG.info("Running coordinator status service from last
> instance time = "
> + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
> // this is not the first instance, we should only check jobs
> // that have actions or jobs been
> // updated >= start time of last service run;
> List<CoordinatorActionBean> actionsList =
> CoordActionQueryExecutor.getInstance().getList(
>
> CoordActionQuery.GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME,
> lastInstanceStartTime);
> Set<String> coordIds = new HashSet<String>();
> for (CoordinatorActionBean action : actionsList) {
> coordIds.add(action.getJobId());
> }
> pendingJobCheckList = new ArrayList<CoordinatorJobBean>();
> for (String coordId : coordIds.toArray(new
> String[coordIds.size()])) {
> CoordinatorJobBean coordJob;
> try {
> coordJob =
> CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, coordId);
> }
> catch (JPAExecutorException jpaee) {
> if (jpaee.getErrorCode().equals(ErrorCode.E0604)) {
> LOG.warn("Exception happened during
> StatusTransitRunnable; Coordinator Job doesn't exist", jpaee);
> continue;
> } else {
> throw jpaee;
> }
> }
> // Running coord job might have pending false
> Job.Status coordJobStatus = coordJob.getStatus();
> if ((coordJob.isPending() ||
> coordJobStatus.equals(Job.Status.PAUSED)
> || coordJobStatus.equals(Job.Status.RUNNING)
> ||
> coordJobStatus.equals(Job.Status.RUNNINGWITHERROR)
> ||
> coordJobStatus.equals(Job.Status.PAUSEDWITHERROR))
> && !coordJobStatus.equals(Job.Status.IGNORED)) {
> pendingJobCheckList.add(coordJob);
> }
> }
>
> pendingJobCheckList.addAll(CoordJobQueryExecutor.getInstance().getList(
> CoordJobQuery.GET_COORD_JOBS_CHANGED,
> lastInstanceStartTime));
> }
> aggregateCoordJobsStatus(pendingJobCheckList);
> }
> }
> {code}
> This could be done in one sql, something like
> select w.id, w.status, w.pending from CoordinatorJobBean w where
> w.startTimestamp <= :matTime AND (w.statusStr = 'PREP' OR w.statusStr =
> 'RUNNING' or w.statusStr = 'RUNNINGWITHERROR' or w.statusStr=
> 'PAUSEDWITHERROR' and w.statusStr <> 'IGNORED') w.id in ( select a.jobId
> from CoordinatorActionBean a where a.lastModifiedTimestamp >=
> :lastModifiedTime groupby a.jobId)
> Same for bundleTransit().
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)