Repository: ambari Updated Branches: refs/heads/branch-2.5 86b86d458 -> 4149aaa5e
AMBARI-19062 : in hive view directly calling DB and removed ATS calls when fetching history data. (nitirajrathore) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/4149aaa5 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/4149aaa5 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/4149aaa5 Branch: refs/heads/branch-2.5 Commit: 4149aaa5e03d519256bd0735b6498cb846c77a47 Parents: 86b86d4 Author: Nitiraj Rathore <[email protected]> Authored: Fri Dec 30 13:38:41 2016 +0530 Committer: Nitiraj Rathore <[email protected]> Committed: Fri Dec 30 13:39:26 2016 +0530 ---------------------------------------------------------------------- .../view/hive2/resources/jobs/Aggregator.java | 137 ++++++++++--------- 1 file changed, 73 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/4149aaa5/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/Aggregator.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/Aggregator.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/Aggregator.java index 99faeca..d399c47 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/Aggregator.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/Aggregator.java @@ -19,11 +19,15 @@ package org.apache.ambari.view.hive2.resources.jobs; import akka.actor.ActorRef; +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Predicate; +import com.google.common.base.Strings; +import com.google.common.collect.FluentIterable; import org.apache.ambari.view.hive2.actor.message.job.SaveDagInformation; import org.apache.ambari.view.hive2.persistence.utils.FilteringStrategy; import org.apache.ambari.view.hive2.persistence.utils.Indexed; import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound; -import org.apache.ambari.view.hive2.persistence.utils.OnlyOwnersFilteringStrategy; import org.apache.ambari.view.hive2.resources.IResourceManager; import org.apache.ambari.view.hive2.resources.files.FileService; import org.apache.ambari.view.hive2.resources.jobs.atsJobs.HiveQueryId; @@ -36,8 +40,8 @@ import org.apache.commons.beanutils.PropertyUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.lang.reflect.InvocationTargetException; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -84,13 +88,9 @@ public class Aggregator { * @param endTime: exclusive, time in secs from epoch * @return: list of jobs */ - public List<Job> readAllForUserByTime(String username, long startTime, long endTime) { - List<HiveQueryId> queryIdList = ats.getHiveQueryIdsForUserByTime(username, startTime, endTime); - List<Job> allJobs = fetchDagsAndMergeJobs(queryIdList); - List<Job> dbOnlyJobs = readDBOnlyJobs(username, queryIdList, startTime, endTime); - allJobs.addAll(dbOnlyJobs); - - return allJobs; + public List<Job> readAllForUserByTime(String username, Long startTime, Long endTime) { + List<Job> jobs = readDBJobs(username, startTime, endTime); + return jobs; } /** @@ -98,25 +98,22 @@ public class Aggregator { * @param jobInfos: infos of job to get * @return: list of updated Job */ - public List<Job> readJobsByIds(List<JobInfo> jobInfos) { - //categorize jobs - List<String> jobsWithHiveIds = new LinkedList<>(); - List<String> dbOnlyJobs = new LinkedList<>(); - - for (JobInfo jobInfo : jobInfos) { - if (null == jobInfo.getHiveId() || jobInfo.getHiveId().trim().isEmpty()) { - dbOnlyJobs.add(jobInfo.getJobId()); - } else { - jobsWithHiveIds.add(jobInfo.getHiveId()); + public List<Job> readJobsByIds(final List<JobInfo> jobInfos) { + List<String> jobIds = FluentIterable.from(jobInfos).filter(new Predicate<JobInfo>() { + @Override + public boolean apply(@Nullable JobInfo input) { + return !Strings.isNullOrEmpty(input.getJobId()); } - } - - List<HiveQueryId> queryIdList = ats.getHiveQueryIdByEntityList(jobsWithHiveIds); - List<Job> allJobs = fetchDagsAndMergeJobs(queryIdList); - List<Job> dbJobs = readJobsFromDbByJobId(dbOnlyJobs); - - allJobs.addAll(dbJobs); - return allJobs; + }).transform(new Function<JobInfo, String>() { + @Nullable + @Override + public String apply(@Nullable JobInfo input) { + return input.getJobId(); + } + }).toList(); + List<Job> dbJobs = readJobsFromDbByJobId(jobIds); + LOG.debug("readJobsByIds: dbJobs : {}", dbJobs); + return dbJobs; } /** @@ -124,17 +121,24 @@ public class Aggregator { * @param jobsIds: list of ids of jobs * @return: list of all the jobs found */ - private List<Job> readJobsFromDbByJobId(List<String> jobsIds) { - List<Job> jobs = new LinkedList<>(); - for (final String jid : jobsIds) { - try { - Job job = getJobFromDbByJobId(jid); - jobs.add(job); - } catch (ItemNotFound itemNotFound) { - LOG.error("Error while finding job with id : {}", jid, itemNotFound); + private List<Job> readJobsFromDbByJobId(final List<String> jobsIds) { + LOG.info("Reading jobs from db with ids : {} ", jobsIds); + List<Job> jobs = viewJobResourceManager.readAll(new FilteringStrategy() { + @Override + public boolean isConform(Indexed item) { + JobImpl job = (JobImpl) item; + return jobsIds.contains(job.getId()); } - } + @Override + public String whereStatement() { + String query = " id in ( " + Joiner.on(",").join(jobsIds) + " ) "; + LOG.debug("where clause for jobsIds : {}", query); + return query; + } + }); + + LOG.debug("jobs returned from DB : {}" , jobs); return jobs; } @@ -172,44 +176,49 @@ public class Aggregator { * @return */ public List<Job> readAll(String username) { - List<HiveQueryId> queries = ats.getHiveQueryIdsForUser(username); - LOG.debug("HiveQueryIds fetched : {}", queries); - List<Job> allJobs = fetchDagsAndMergeJobs(queries); - List<Job> dbOnlyJobs = readDBOnlyJobs(username, queries, null, null); - LOG.debug("Jobs only present in DB: {}", dbOnlyJobs); - allJobs.addAll(dbOnlyJobs); - return allJobs; + return readAllForUserByTime(username, null, null); } /** * reads all the jobs from DB for username and excludes the jobs mentioned in queries list * @param username : username for which the jobs are to be read. - * @param queries : the jobs to exclude * @param startTime: can be null, if not then the window start time for job * @param endTime: can be null, if not then the window end time for job * @return : the jobs in db that are not in the queries */ - private List<Job> readDBOnlyJobs(String username, List<HiveQueryId> queries, Long startTime, Long endTime) { - List<Job> dbOnlyJobs = new LinkedList<>(); - HashMap<String, String> operationIdVsHiveId = new HashMap<>(); - - for (HiveQueryId hqid : queries) { - operationIdVsHiveId.put(hqid.operationId, hqid.entity); - } - LOG.debug("operationIdVsHiveId : {} ", operationIdVsHiveId); - //cover case when operationId is present, but not exists in ATS - //e.g. optimized queries without executing jobs, like "SELECT * FROM TABLE" - List<Job> jobs = viewJobResourceManager.readAll(new OnlyOwnersFilteringStrategy(username)); - for (Job job : jobs) { - if (null != startTime && null != endTime && null != job.getDateSubmitted() - && (job.getDateSubmitted() < startTime || job.getDateSubmitted() >= endTime || operationIdVsHiveId.containsKey(job.getGuid())) - ) { - continue; // don't include this in the result - } else { - dbOnlyJobs.add(job); + private List<Job> readDBJobs(final String username, final Long startTime, final Long endTime) { + List<Job> jobs = viewJobResourceManager.readAll( new FilteringStrategy() { + @Override + public boolean isConform(Indexed item) { + JobImpl job = (JobImpl) item; + return job.getOwner().compareTo(username) == 0 && + ( (null == startTime || job.getDateSubmitted() >= startTime ) && + ( null == endTime || job.getDateSubmitted() < endTime ) + ); } - } - return dbOnlyJobs; + @Override + public String whereStatement() { + StringBuilder sb = new StringBuilder( "owner = '" ).append( username ).append( "'" ); + if( null != startTime || null != endTime ) { + sb.append(" AND ( " ); + if( null != startTime ) { + sb.append( " dateSubmitted >= " ).append( startTime ); + } + if( null != endTime ){ + if(null != startTime){ + sb.append(" AND "); + } + sb.append(" dateSubmitted < ").append(endTime); + } + sb.append( " ) " ); + } + String where = sb.toString(); + LOG.debug("where statement : {}", where); + return where; + } + }); + LOG.debug("returning jobs: {}", jobs); + return jobs; } private List<Job> fetchDagsAndMergeJobs(List<HiveQueryId> queries) {
