[ 
https://issues.apache.org/jira/browse/OOZIE-1885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Purshotam Shah updated OOZIE-1885:
----------------------------------

    Description: 
{code}
 private void coordTransit() throws JPAExecutorException, CommandException {
            List<CoordinatorJobBean> pendingJobCheckList = null;
            if (lastInstanceStartTime == null) {
                LOG.info("Running coordinator status service first instance");
                // this is the first instance, we need to check for all pending 
jobs;
                pendingJobCheckList = jpaService.execute(new 
CoordJobsGetPendingJPAExecutor(limit));
            }
            else {
                LOG.info("Running coordinator status service from last instance 
time =  "
                        + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
                // this is not the first instance, we should only check jobs
                // that have actions or jobs been
                // updated >= start time of last service run;
                List<CoordinatorActionBean> actionsList = 
CoordActionQueryExecutor.getInstance().getList(
                        
CoordActionQuery.GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME, 
lastInstanceStartTime);
                Set<String> coordIds = new HashSet<String>();
                for (CoordinatorActionBean action : actionsList) {
                    coordIds.add(action.getJobId());
                }

                pendingJobCheckList = new ArrayList<CoordinatorJobBean>();
                for (String coordId : coordIds.toArray(new 
String[coordIds.size()])) {
                    CoordinatorJobBean coordJob;
                    try {
                        coordJob = 
CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, coordId);
                    }
                    catch (JPAExecutorException jpaee) {
                        if (jpaee.getErrorCode().equals(ErrorCode.E0604)) {
                            LOG.warn("Exception happened during 
StatusTransitRunnable; Coordinator Job doesn't exist", jpaee);
                            continue;
                        } else {
                            throw jpaee;
                        }
                    }
                    // Running coord job might have pending false
                    Job.Status coordJobStatus = coordJob.getStatus();
                    if ((coordJob.isPending() || 
coordJobStatus.equals(Job.Status.PAUSED)
                            || coordJobStatus.equals(Job.Status.RUNNING)
                            || 
coordJobStatus.equals(Job.Status.RUNNINGWITHERROR)
                            || 
coordJobStatus.equals(Job.Status.PAUSEDWITHERROR))
                            && !coordJobStatus.equals(Job.Status.IGNORED)) {
                        pendingJobCheckList.add(coordJob);
                    }
                }
                
pendingJobCheckList.addAll(CoordJobQueryExecutor.getInstance().getList(
                        CoordJobQuery.GET_COORD_JOBS_CHANGED, 
lastInstanceStartTime));
            }
            aggregateCoordJobsStatus(pendingJobCheckList);
        }
    }

{code}


This could be done in one sql, something like 
select w.id, w.status, w.pending from CoordinatorJobBean w where 
w.startTimestamp <= :matTime AND (w.statusStr = 'PREP' OR w.statusStr = 
'RUNNING' or w.statusStr = 'RUNNINGWITHERROR' or w.statusStr= 'PAUSEDWITHERROR' 
and w.statusStr <> 'IGNORED') w.id in  ( select a.jobId from 
CoordinatorActionBean a where a.lastModifiedTimestamp >= :lastModifiedTime 
groupby a.jobId)

Same for bundleTransit().

> Query optimization for StatusTransitService
> -------------------------------------------
>
>                 Key: OOZIE-1885
>                 URL: https://issues.apache.org/jira/browse/OOZIE-1885
>             Project: Oozie
>          Issue Type: Bug
>            Reporter: Purshotam Shah
>
> {code}
>  private void coordTransit() throws JPAExecutorException, CommandException {
>             List<CoordinatorJobBean> pendingJobCheckList = null;
>             if (lastInstanceStartTime == null) {
>                 LOG.info("Running coordinator status service first instance");
>                 // this is the first instance, we need to check for all 
> pending jobs;
>                 pendingJobCheckList = jpaService.execute(new 
> CoordJobsGetPendingJPAExecutor(limit));
>             }
>             else {
>                 LOG.info("Running coordinator status service from last 
> instance time =  "
>                         + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
>                 // this is not the first instance, we should only check jobs
>                 // that have actions or jobs been
>                 // updated >= start time of last service run;
>                 List<CoordinatorActionBean> actionsList = 
> CoordActionQueryExecutor.getInstance().getList(
>                         
> CoordActionQuery.GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME, 
> lastInstanceStartTime);
>                 Set<String> coordIds = new HashSet<String>();
>                 for (CoordinatorActionBean action : actionsList) {
>                     coordIds.add(action.getJobId());
>                 }
>                 pendingJobCheckList = new ArrayList<CoordinatorJobBean>();
>                 for (String coordId : coordIds.toArray(new 
> String[coordIds.size()])) {
>                     CoordinatorJobBean coordJob;
>                     try {
>                         coordJob = 
> CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, coordId);
>                     }
>                     catch (JPAExecutorException jpaee) {
>                         if (jpaee.getErrorCode().equals(ErrorCode.E0604)) {
>                             LOG.warn("Exception happened during 
> StatusTransitRunnable; Coordinator Job doesn't exist", jpaee);
>                             continue;
>                         } else {
>                             throw jpaee;
>                         }
>                     }
>                     // Running coord job might have pending false
>                     Job.Status coordJobStatus = coordJob.getStatus();
>                     if ((coordJob.isPending() || 
> coordJobStatus.equals(Job.Status.PAUSED)
>                             || coordJobStatus.equals(Job.Status.RUNNING)
>                             || 
> coordJobStatus.equals(Job.Status.RUNNINGWITHERROR)
>                             || 
> coordJobStatus.equals(Job.Status.PAUSEDWITHERROR))
>                             && !coordJobStatus.equals(Job.Status.IGNORED)) {
>                         pendingJobCheckList.add(coordJob);
>                     }
>                 }
>                 
> pendingJobCheckList.addAll(CoordJobQueryExecutor.getInstance().getList(
>                         CoordJobQuery.GET_COORD_JOBS_CHANGED, 
> lastInstanceStartTime));
>             }
>             aggregateCoordJobsStatus(pendingJobCheckList);
>         }
>     }
> {code}
> This could be done in one sql, something like 
> select w.id, w.status, w.pending from CoordinatorJobBean w where 
> w.startTimestamp <= :matTime AND (w.statusStr = 'PREP' OR w.statusStr = 
> 'RUNNING' or w.statusStr = 'RUNNINGWITHERROR' or w.statusStr= 
> 'PAUSEDWITHERROR' and w.statusStr <> 'IGNORED') w.id in  ( select a.jobId 
> from CoordinatorActionBean a where a.lastModifiedTimestamp >= 
> :lastModifiedTime groupby a.jobId)
> Same for bundleTransit().



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to