Repository: ambari Updated Branches: refs/heads/trunk 31bc2d529 -> e88ca22cf
AMBARI-16980 : History tab takes long to populate when there is more entry in history table. (Nitiraj Rathore via dipayanb) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/e88ca22c Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/e88ca22c Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/e88ca22c Branch: refs/heads/trunk Commit: e88ca22cfb7f97c36886a925d8197f1fee477070 Parents: 31bc2d5 Author: Dipayan Bhowmick <[email protected]> Authored: Thu Jun 9 17:49:07 2016 +0530 Committer: Dipayan Bhowmick <[email protected]> Committed: Thu Jun 9 17:49:07 2016 +0530 ---------------------------------------------------------------------- .../hive/persistence/utils/ItemNotFound.java | 18 ++ .../view/hive/resources/jobs/Aggregator.java | 284 +++++++++++++++---- .../view/hive/resources/jobs/JobService.java | 85 +++++- .../jobs/OperationHandleResourceManager.java | 12 +- .../hive/resources/jobs/atsJobs/ATSParser.java | 82 +++++- .../jobs/atsJobs/ATSRequestsDelegate.java | 6 +- .../jobs/atsJobs/ATSRequestsDelegateImpl.java | 35 ++- .../hive/resources/jobs/atsJobs/IATSParser.java | 8 +- .../view/hive/resources/jobs/viewJobs/Job.java | 23 +- .../jobs/viewJobs/JobControllerImpl.java | 28 +- .../hive/resources/jobs/viewJobs/JobImpl.java | 17 +- .../hive/resources/jobs/viewJobs/JobInfo.java | 78 +++++ .../app/components/number-range-widget.js | 15 +- .../ui/hive-web/app/controllers/history.js | 201 +++++++++---- .../ui/hive-web/app/initializers/i18n.js | 3 +- .../resources/ui/hive-web/app/models/job.js | 3 +- .../resources/ui/hive-web/app/routes/history.js | 16 +- .../ui/hive-web/app/services/history.js | 204 +++++++++++++ .../ui/hive-web/app/templates/history.hbs | 68 ++--- .../ui/hive-web/app/utils/constants.js | 2 +- .../view/hive/resources/jobs/ATSParserTest.java | 28 +- .../hive/resources/jobs/AggregatorTest.java | 69 ++++- 22 files changed, 1049 insertions(+), 236 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/e88ca22c/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/persistence/utils/ItemNotFound.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/persistence/utils/ItemNotFound.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/persistence/utils/ItemNotFound.java index 3b7e51a..06976b9 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/persistence/utils/ItemNotFound.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/persistence/utils/ItemNotFound.java @@ -22,4 +22,22 @@ package org.apache.ambari.view.hive.persistence.utils; * Thrown when item was not found in DB */ public class ItemNotFound extends Exception { + public ItemNotFound() { + } + + public ItemNotFound(String message) { + super(message); + } + + public ItemNotFound(String message, Throwable cause) { + super(message, cause); + } + + public ItemNotFound(Throwable cause) { + super(cause); + } + + public ItemNotFound(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/e88ca22c/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/Aggregator.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/Aggregator.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/Aggregator.java index f119ff3..5164a4d 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/Aggregator.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/Aggregator.java @@ -29,6 +29,7 @@ import org.apache.ambari.view.hive.resources.jobs.atsJobs.IATSParser; import org.apache.ambari.view.hive.resources.jobs.atsJobs.TezDagId; import org.apache.ambari.view.hive.resources.jobs.viewJobs.Job; import org.apache.ambari.view.hive.resources.jobs.viewJobs.JobImpl; +import org.apache.ambari.view.hive.resources.jobs.viewJobs.JobInfo; import org.apache.commons.beanutils.PropertyUtils; import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Hex; @@ -37,10 +38,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.InvocationTargetException; -import java.util.HashSet; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.Set; /** * View Jobs and ATS Jobs aggregator. @@ -49,7 +49,7 @@ import java.util.Set; * *Meaning*: executed outside of HS2 * - Job info only from ATS * 2) ATS Job with operationId - * a) Hive View Job with same operationId is not present + * a) Hive View Job with same operationId is not present * *Meaning*: executed with HS2 * - Job info only from ATS * b) Hive View Job with operationId is present (need to merge) @@ -62,7 +62,7 @@ import java.util.Set; */ public class Aggregator { protected final static Logger LOG = - LoggerFactory.getLogger(Aggregator.class); + LoggerFactory.getLogger(Aggregator.class); private final IATSParser ats; private final IOperationHandleResourceManager operationHandleResourceManager; @@ -76,53 +76,232 @@ public class Aggregator { this.ats = ats; } - public List<Job> readAll(String username) { - Set<String> addedOperationIds = new HashSet<String>(); - - List<Job> allJobs = new LinkedList<Job>(); - List<HiveQueryId> queries = ats.getHiveQueryIdsList(username); - for (HiveQueryId atsHiveQuery : queries) { - TezDagId atsTezDag = getTezDagFromHiveQueryId(atsHiveQuery); + /** + * gets all the jobs for 'username' where the job submission time is between 'startTime' (inclusive) + * and endTime (exclusive). + * Fetches the jobs from ATS and DB merges and update DB. returns the combined list. + * + * @param username: username for which jobs have to be fetched. + * @param startTime: inclusive, time in secs from epoch + * @param endTime: exclusive, time in secs from epoch + * @return: list of jobs + */ + public List<Job> readAllForUserByTime(String username, long startTime, long endTime) { + List<HiveQueryId> queryIdList = ats.getHiveQueryIdsForUserByTime(username, startTime, endTime); + List<Job> allJobs = fetchDagsAndMergeJobs(queryIdList); + List<Job> dbOnlyJobs = readDBOnlyJobs(username, queryIdList, startTime, endTime); + allJobs.addAll(dbOnlyJobs); - JobImpl atsJob; - if (hasOperationId(atsHiveQuery)) { - try { - Job viewJob = getJobByOperationId(urlSafeBase64ToHexString(atsHiveQuery.operationId)); - saveJobInfoIfNeeded(atsHiveQuery, atsTezDag, viewJob); + return allJobs; + } - atsJob = mergeAtsJobWithViewJob(atsHiveQuery, atsTezDag, viewJob); - } catch (ItemNotFound itemNotFound) { - // Executed from HS2, but outside of Hive View - atsJob = atsOnlyJob(atsHiveQuery, atsTezDag); - } + /** + * fetches the new state of jobs from ATS and from DB. Does merging/updating as required. + * @param jobInfos: infos of job to get + * @return: list of updated Job + */ + public List<Job> readJobsByIds(List<JobInfo> jobInfos) { + //categorize jobs + List<String> jobsWithHiveIds = new LinkedList<>(); + List<String> dbOnlyJobs = new LinkedList<>(); + + for (JobInfo jobInfo : jobInfos) { + if (null == jobInfo.getHiveId() || jobInfo.getHiveId().trim().isEmpty()) { + dbOnlyJobs.add(jobInfo.getJobId()); } else { - atsJob = atsOnlyJob(atsHiveQuery, atsTezDag); + jobsWithHiveIds.add(jobInfo.getHiveId()); } - allJobs.add(atsJob); + } + + List<HiveQueryId> queryIdList = ats.getHiveQueryIdByEntityList(jobsWithHiveIds); + List<Job> allJobs = fetchDagsAndMergeJobs(queryIdList); + List<Job> dbJobs = readJobsFromDbByJobId(dbOnlyJobs); + + allJobs.addAll(dbJobs); + return allJobs; + } - addedOperationIds.add(atsHiveQuery.operationId); + /** + * gets the jobs from the Database given their id + * @param jobsIds: list of ids of jobs + * @return: list of all the jobs found + */ + private List<Job> readJobsFromDbByJobId(List<String> jobsIds) { + List<Job> jobs = new LinkedList<>(); + for (final String jid : jobsIds) { + try { + Job job = getJobFromDbByJobId(jid); + jobs.add(job); + } catch (ItemNotFound itemNotFound) { + LOG.error("Error while finding job with id : {}", jid, itemNotFound); + } } + + return jobs; + } + + /** + * fetches the job from DB given its id + * @param jobId: the id of the job to fetch + * @return: the job + * @throws ItemNotFound: if job with given id is not found in db + */ + private Job getJobFromDbByJobId(final String jobId) throws ItemNotFound { + if (null == jobId) + return null; + + List<Job> jobs = viewJobResourceManager.readAll(new FilteringStrategy() { + @Override + public boolean isConform(Indexed item) { + return item.getId().equals(jobId); + } + + @Override + public String whereStatement() { + return "id = '" + jobId + "'"; // even IDs are string + } + }); + + if (null != jobs && !jobs.isEmpty()) + return jobs.get(0); + + throw new ItemNotFound(String.format("Job with id %s not found.", jobId)); + } + + /** + * returns the job which is associated with the given (guid) + * @param optId: the operationId for which the job needs to be fetched. + * @return: the job + * @throws ItemNotFound: if no job was found to be associated with given operationId (guid) or if no + * StoredOperationHandle found with guid optId. + */ + private Job getJobFromDbByOperationId(final String optId) throws ItemNotFound { + StoredOperationHandle handle = getStoredOperationHandleByGuid(optId); + Job job = operationHandleResourceManager.getJobByHandle(handle); + return job; + } + + /** + * returns the StoredOperationHandle with given id + * @param optId: id of the StoredOperationHandle + * @return: StoredOperationHandle + * @throws ItemNotFound: if no StoredOperationHandle found for given guid (operationId). + */ + private StoredOperationHandle getStoredOperationHandleByGuid(final String optId) throws ItemNotFound { + LOG.debug("stored procedure with operation id : {} in DB", optId); + List<StoredOperationHandle> operationHandles = operationHandleResourceManager.readAll(new FilteringStrategy() { + @Override + public boolean isConform(Indexed item) { + StoredOperationHandle soh = (StoredOperationHandle) item; + return soh.getGuid().equals(optId); + } + + @Override + public String whereStatement() { + return " guid = '" + optId + "'"; + } + }); + + if (null != operationHandles && !operationHandles.isEmpty()) { + return operationHandles.get(0); + } + + throw new ItemNotFound(String.format("Stored operation handle with id %s not found", optId)); + } + + /** + * returns all the jobs from ATS and DB (for this instance) for the given user. + * @param username + * @return + */ + public List<Job> readAll(String username) { + List<HiveQueryId> queries = ats.getHiveQueryIdsForUser(username); + LOG.debug("HiveQueryIds fetched : {}", queries); + List<Job> allJobs = fetchDagsAndMergeJobs(queries); + List<Job> dbOnlyJobs = readDBOnlyJobs(username, queries, null, null); + LOG.debug("Jobs only present in DB: {}", dbOnlyJobs); + allJobs.addAll(dbOnlyJobs); + return allJobs; + } + + /** + * reads all the jobs from DB for username and excludes the jobs mentioned in queries list + * @param username : username for which the jobs are to be read. + * @param queries : the jobs to exclude + * @param startTime: can be null, if not then the window start time for job + * @param endTime: can be null, if not then the window end time for job + * @return : the jobs in db that are not in the queries + */ + private List<Job> readDBOnlyJobs(String username, List<HiveQueryId> queries, Long startTime, Long endTime) { + List<Job> dbOnlyJobs = new LinkedList<Job>(); + HashMap<String, String> operationIdVsHiveId = new HashMap<>(); + + for (HiveQueryId hqid : queries) { + operationIdVsHiveId.put(hqid.operationId, hqid.entity); + } + LOG.info("operationIdVsHiveId : {} ", operationIdVsHiveId); //cover case when operationId is present, but not exists in ATS //e.g. optimized queries without executing jobs, like "SELECT * FROM TABLE" List<Job> jobs = viewJobResourceManager.readAll(new OnlyOwnersFilteringStrategy(username)); - for (Job job : jobs ) { + for (Job job : jobs) { + if (null != startTime && null != endTime && null != job.getDateSubmitted() + && (job.getDateSubmitted() < startTime || job.getDateSubmitted() >= endTime) + ) { + continue; // don't include this in the result + } + List<StoredOperationHandle> operationHandles = operationHandleResourceManager.readJobRelatedHandles(job); if (operationHandles.size() > 0) { StoredOperationHandle operationHandle = operationHandles.get(0); - if (!addedOperationIds.contains(hexStringToUrlSafeBase64(operationHandle.getGuid()))) { + if (!operationIdVsHiveId.containsKey(hexStringToUrlSafeBase64(operationHandle.getGuid()))) { //e.g. query without hadoop job: select * from table - allJobs.add(job); + dbOnlyJobs.add(job); + } + } + } + + return dbOnlyJobs; + } + + private List<Job> fetchDagsAndMergeJobs(List<HiveQueryId> queries) { + List<Job> allJobs = new LinkedList<Job>(); + + for (HiveQueryId atsHiveQuery : queries) { + JobImpl atsJob = null; + if (hasOperationId(atsHiveQuery)) { + try { + Job viewJob = getJobFromDbByOperationId(urlSafeBase64ToHexString(atsHiveQuery.operationId)); + TezDagId atsTezDag = getTezDagFromHiveQueryId(atsHiveQuery); + atsJob = mergeHiveAtsTez(atsHiveQuery, atsTezDag, viewJob); + } catch (ItemNotFound itemNotFound) { + LOG.error("Ignore : {}", itemNotFound.getMessage()); + continue; } - }else{ - LOG.error("operationHandle not found for job : {}", job.getId()); + } else { + TezDagId atsTezDag = getTezDagFromHiveQueryId(atsHiveQuery); + atsJob = atsOnlyJob(atsHiveQuery, atsTezDag); } + + atsJob.setHiveQueryId(atsHiveQuery.entity); + allJobs.add(atsJob); } return allJobs; } + /** + * @param atsHiveQuery + * @param atsTezDag + * @param viewJob + * @return + */ + private JobImpl mergeHiveAtsTez(HiveQueryId atsHiveQuery, TezDagId atsTezDag, Job viewJob) throws ItemNotFound { + saveJobInfoIfNeeded(atsHiveQuery, atsTezDag, viewJob); + return mergeAtsJobWithViewJob(atsHiveQuery, atsTezDag, viewJob); + } + public Job readATSJob(Job viewJob) throws ItemNotFound { TOperationHandle operationHandle = operationHandleResourceManager.getHandleForJob(viewJob).toTOperationHandle(); @@ -157,13 +336,13 @@ public class Aggregator { JobImpl atsJob; try { atsJob = new JobImpl(PropertyUtils.describe(viewJob)); - }catch(IllegalAccessException e){ + } catch (IllegalAccessException e) { LOG.error("Can't instantiate JobImpl", e); return null; - }catch(InvocationTargetException e){ + } catch (InvocationTargetException e) { LOG.error("Can't instantiate JobImpl", e); return null; - }catch(NoSuchMethodException e){ + } catch (NoSuchMethodException e) { LOG.error("Can't instantiate JobImpl", e); return null; } @@ -172,16 +351,21 @@ public class Aggregator { } protected void saveJobInfoIfNeeded(HiveQueryId hiveQueryId, TezDagId tezDagId, Job viewJob) throws ItemNotFound { + boolean shouldUpdate = false; if (viewJob.getDagName() == null || viewJob.getDagName().isEmpty()) { if (hiveQueryId.dagNames != null && hiveQueryId.dagNames.size() > 0) { viewJob.setDagName(hiveQueryId.dagNames.get(0)); - viewJobResourceManager.update(viewJob, viewJob.getId()); + shouldUpdate = true; } } if (tezDagId.status != null && (tezDagId.status.compareToIgnoreCase(Job.JOB_STATE_UNKNOWN) != 0) && - !viewJob.getStatus().equals(tezDagId.status)) { + !viewJob.getStatus().equals(tezDagId.status)) { viewJob.setDagId(tezDagId.entity); viewJob.setStatus(tezDagId.status); + shouldUpdate = true; + } + + if (shouldUpdate) { viewJobResourceManager.update(viewJob, viewJob.getId()); } } @@ -192,7 +376,7 @@ public class Aggregator { fillAtsJobFields(atsJob, atsHiveQuery, atsTezDag); String query = atsHiveQuery.query; - atsJob.setTitle(query.substring(0, (query.length() > 42)?42:query.length())); + atsJob.setTitle(query.substring(0, (query.length() > 42) ? 42 : query.length())); atsJob.setQueryFile(FileService.JSON_PATH_FILE + atsHiveQuery.url + "#otherinfo.QUERY!queryText"); return atsJob; @@ -212,41 +396,21 @@ public class Aggregator { return atsJob; } - protected Job getJobByOperationId(final String opId) throws ItemNotFound { - List<StoredOperationHandle> operationHandles = operationHandleResourceManager.readAll(new FilteringStrategy() { - @Override - public boolean isConform(Indexed item) { - StoredOperationHandle opHandle = (StoredOperationHandle) item; - return opHandle.getGuid().equals(opId); - } - - @Override - public String whereStatement() { - return "guid='" + opId + "'"; - } - }); - - if (operationHandles.size() != 1) - throw new ItemNotFound(); - - return viewJobResourceManager.read(operationHandles.get(0).getJobId()); - } - - protected static String urlSafeBase64ToHexString(String urlsafeBase64){ + protected static String urlSafeBase64ToHexString(String urlsafeBase64) { byte[] decoded = Base64.decodeBase64(urlsafeBase64); StringBuilder sb = new StringBuilder(); - for(byte b : decoded){ + for (byte b : decoded) { sb.append(String.format("%02x", b)); } return sb.toString(); } - protected static String hexStringToUrlSafeBase64(String hexString){ + protected static String hexStringToUrlSafeBase64(String hexString) { byte[] decoded = new byte[hexString.length() / 2]; - for(int i=0; i<hexString.length(); i+=2) { - decoded[i / 2] = (byte) Integer.parseInt(String.format("%c%c", hexString.charAt(i), hexString.charAt(i+1)), 16); + for (int i = 0; i < hexString.length(); i += 2) { + decoded[i / 2] = (byte) Integer.parseInt(String.format("%c%c", hexString.charAt(i), hexString.charAt(i + 1)), 16); } return Base64.encodeBase64URLSafeString(decoded); } http://git-wip-us.apache.org/repos/asf/ambari/blob/e88ca22c/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java index 34f060d..a540ca0 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java @@ -21,11 +21,23 @@ package org.apache.ambari.view.hive.resources.jobs; import org.apache.ambari.view.ViewResourceHandler; import org.apache.ambari.view.hive.BaseService; import org.apache.ambari.view.hive.backgroundjobs.BackgroundJobController; -import org.apache.ambari.view.hive.client.*; +import org.apache.ambari.view.hive.client.Connection; +import org.apache.ambari.view.hive.client.Cursor; +import org.apache.ambari.view.hive.client.HiveAuthCredentials; +import org.apache.ambari.view.hive.client.HiveClientException; +import org.apache.ambari.view.hive.client.UserLocalConnection; +import org.apache.ambari.view.hive.client.UserLocalHiveAuthCredentials; import org.apache.ambari.view.hive.persistence.utils.ItemNotFound; import org.apache.ambari.view.hive.resources.jobs.atsJobs.IATSParser; -import org.apache.ambari.view.hive.resources.jobs.viewJobs.*; -import org.apache.ambari.view.hive.utils.*; +import org.apache.ambari.view.hive.resources.jobs.viewJobs.Job; +import org.apache.ambari.view.hive.resources.jobs.viewJobs.JobController; +import org.apache.ambari.view.hive.resources.jobs.viewJobs.JobImpl; +import org.apache.ambari.view.hive.resources.jobs.viewJobs.JobInfo; +import org.apache.ambari.view.hive.resources.jobs.viewJobs.JobResourceManager; +import org.apache.ambari.view.hive.utils.MisconfigurationFormattedException; +import org.apache.ambari.view.hive.utils.NotFoundFormattedException; +import org.apache.ambari.view.hive.utils.ServiceFormattedException; +import org.apache.ambari.view.hive.utils.SharedObjectsFactory; import org.apache.commons.beanutils.PropertyUtils; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVPrinter; @@ -36,11 +48,28 @@ import org.slf4j.LoggerFactory; import javax.inject.Inject; import javax.servlet.http.HttpServletResponse; -import javax.ws.rs.*; -import javax.ws.rs.core.*; -import java.io.*; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; +import javax.ws.rs.core.UriInfo; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; import java.lang.reflect.InvocationTargetException; -import java.util.*; +import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; /** @@ -397,20 +426,50 @@ public class JobService extends BaseService { */ @GET @Produces(MediaType.APPLICATION_JSON) - public Response getList() { + public List<Job> getList(@QueryParam("startTime") long startTime, @QueryParam("endTime") long endTime) { try { - LOG.debug("Getting all job"); - List<Job> allJobs = getAggregator().readAll(context.getUsername()); + + LOG.debug("Getting all job: startTime: {}, endTime: {}",startTime,endTime); + List<Job> allJobs = getAggregator().readAllForUserByTime(context.getUsername(),startTime, endTime); for(Job job : allJobs) { job.setSessionTag(null); } - JSONObject object = new JSONObject(); - object.put("jobs", allJobs); - return Response.ok(object).build(); + LOG.info("allJobs : {}", allJobs); + return allJobs; + } catch (WebApplicationException ex) { + LOG.error("Exception occured while fetching all jobs.", ex); + throw ex; + } catch (Exception ex) { + LOG.error("Exception occured while fetching all jobs.", ex); + throw new ServiceFormattedException(ex.getMessage(), ex); + } + } + + /** + * fetch the jobs with given info. + * provide as much info about the job so that next api can optimize the fetch process. + * @param jobInfos + * @return + */ + @Path("/getList") + @POST + @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.APPLICATION_JSON) + public List<Job> getList(List<JobInfo> jobInfos) { + try { + LOG.debug("fetching jobs with ids :{}", jobInfos); + List<Job> allJobs = getAggregator().readJobsByIds(jobInfos); + for(Job job : allJobs) { + job.setSessionTag(null); + } + + return allJobs; } catch (WebApplicationException ex) { + LOG.error("Exception occured while fetching all jobs.", ex); throw ex; } catch (Exception ex) { + LOG.error("Exception occured while fetching all jobs.", ex); throw new ServiceFormattedException(ex.getMessage(), ex); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/e88ca22c/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleResourceManager.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleResourceManager.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleResourceManager.java index f0eecea..c53cad5 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleResourceManager.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleResourceManager.java @@ -24,9 +24,11 @@ import org.apache.ambari.view.hive.persistence.utils.Indexed; import org.apache.ambari.view.hive.persistence.utils.ItemNotFound; import org.apache.ambari.view.hive.resources.SharedCRUDResourceManager; import org.apache.ambari.view.hive.resources.jobs.viewJobs.Job; +import org.apache.ambari.view.hive.resources.jobs.viewJobs.JobImpl; import org.apache.ambari.view.hive.utils.ServiceFormattedException; import org.apache.hive.service.cli.thrift.TOperationHandle; +import java.util.LinkedList; import java.util.List; public class OperationHandleResourceManager extends SharedCRUDResourceManager<StoredOperationHandle> @@ -64,7 +66,7 @@ public class OperationHandleResourceManager extends SharedCRUDResourceManager<St @Override public List<Job> getHandleRelatedJobs(final StoredOperationHandle operationHandle) { - return storageFactory.getStorage().loadAll(Job.class, new FilteringStrategy() { + List<JobImpl> list = storageFactory.getStorage().loadAll(JobImpl.class, new FilteringStrategy() { @Override public boolean isConform(Indexed item) { Job job = (Job) item; @@ -76,13 +78,19 @@ public class OperationHandleResourceManager extends SharedCRUDResourceManager<St return "id = '" + operationHandle.getJobId() + "'"; } }); + + if(null == list) + return null; + + List<Job> jobs = new LinkedList<Job>(list); + return jobs; } @Override public Job getJobByHandle(StoredOperationHandle handle) throws ItemNotFound { List<Job> handleRelatedJobs = getHandleRelatedJobs(handle); if (handleRelatedJobs.size() == 0) - throw new ItemNotFound(); + throw new ItemNotFound(String.format("Job not found for operationId %s", handle)); return handleRelatedJobs.get(0); } http://git-wip-us.apache.org/repos/asf/ambari/blob/e88ca22c/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParser.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParser.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParser.java index c4c85ad..b145df2 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParser.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParser.java @@ -32,7 +32,7 @@ import java.util.List; */ public class ATSParser implements IATSParser { protected final static Logger LOG = - LoggerFactory.getLogger(ATSParser.class); + LoggerFactory.getLogger(ATSParser.class); private ATSRequestsDelegate delegate; @@ -42,13 +42,36 @@ public class ATSParser implements IATSParser { this.delegate = delegate; } + /** + * returns all HiveQueryIDs from ATS for the given user. + * @param username + * @return + */ @Override - public List<HiveQueryId> getHiveQueryIdsList(String username) { - JSONObject entities = delegate.hiveQueryIdList(username); + public List<HiveQueryId> getHiveQueryIdsForUser(String username) { + JSONObject entities = delegate.hiveQueryIdsForUser(username); + return parseHqidJsonFromATS(entities); + } + + /** + * parses the JSONArray or hive query IDs + * @param entities: should contain 'entities' element as JSONArray + * @return + */ + private List<HiveQueryId> parseHqidJsonFromATS(JSONObject entities) { JSONArray jobs = (JSONArray) entities.get("entities"); - List<HiveQueryId> parsedJobs = new LinkedList<HiveQueryId>(); - for(Object job : jobs) { + return getHqidListFromJsonArray(jobs); + } + + /** + * parses List of HiveQueryIds from JSON + * @param jobs + * @return + */ + private List<HiveQueryId> getHqidListFromJsonArray(JSONArray jobs) { + List<HiveQueryId> parsedJobs = new LinkedList<>(); + for (Object job : jobs) { try { HiveQueryId parsedJob = parseAtsHiveJob((JSONObject) job); parsedJobs.add(parsedJob); @@ -81,9 +104,12 @@ public class ATSParser implements IATSParser { @Override public HiveQueryId getHiveQueryIdByOperationId(String guidString) { JSONObject entities = delegate.hiveQueryIdByOperationId(guidString); + return getHiveQueryIdFromJson(entities); + } + + private HiveQueryId getHiveQueryIdFromJson(JSONObject entities) { JSONArray jobs = (JSONArray) entities.get("entities"); - assert jobs.size() <= 1; if (jobs.size() == 0) { return new HiveQueryId(); } @@ -91,6 +117,18 @@ public class ATSParser implements IATSParser { return parseAtsHiveJob((JSONObject) jobs.get(0)); } + /** + * returns the hive entity from ATS. empty object if not found. + * + * @param hiveId: the entityId of the hive + * @return: empty entity if not found else HiveQueryId + */ + @Override + public HiveQueryId getHiveQueryIdByHiveEntityId(String hiveId) { + JSONObject entity = delegate.hiveQueryEntityByEntityId(hiveId); + return parseAtsHiveJob(entity); + } + @Override public TezDagId getTezDAGByName(String name) { JSONArray tezDagEntities = (JSONArray) delegate.tezDagByName(name).get("entities"); @@ -103,6 +141,32 @@ public class ATSParser implements IATSParser { return parseTezDag(tezDagEntities); } + /** + * fetches the HIVE_QUERY_ID from ATS for given user between given time period + * + * @param username: username for which to fetch hive query IDs + * @param startTime: time in miliseconds, inclusive + * @param endTime: time in miliseconds, exclusive + * @return: List of HIVE_QUERY_ID + */ + @Override + public List<HiveQueryId> getHiveQueryIdsForUserByTime(String username, long startTime, long endTime) { + JSONObject entities = delegate.hiveQueryIdsForUserByTime(username, startTime, endTime); + return parseHqidJsonFromATS(entities); + } + + @Override + public List<HiveQueryId> getHiveQueryIdByEntityList(List<String> hiveIds) { + List<HiveQueryId> hiveQueryIds = new LinkedList<>(); + for (String id : hiveIds) { + HiveQueryId hqi = this.getHiveQueryIdByHiveEntityId(id); + if (null != hqi.entity) { + hiveQueryIds.add(hqi); + } + } + return hiveQueryIds; + } + private TezDagId parseTezDag(JSONArray tezDagEntities) { assert tezDagEntities.size() <= 1; if (tezDagEntities.size() == 0) { @@ -123,7 +187,7 @@ public class ATSParser implements IATSParser { parsedJob.entity = (String) job.get("entity"); parsedJob.url = delegate.hiveQueryIdDirectUrl((String) job.get("entity")); - parsedJob.starttime = ((Long) job.get("starttime")) / MillisInSecond; + parsedJob.starttime = ((Long) job.get("starttime")); JSONObject primaryfilters = (JSONObject) job.get("primaryfilters"); JSONArray operationIds = (JSONArray) primaryfilters.get("operationid"); @@ -136,9 +200,9 @@ public class ATSParser implements IATSParser { } JSONObject lastEvent = getLastEvent(job); - long lastEventTimestamp = ((Long) lastEvent.get("timestamp")) / MillisInSecond; + long lastEventTimestamp = ((Long) lastEvent.get("timestamp")); - parsedJob.duration = lastEventTimestamp - parsedJob.starttime; + parsedJob.duration = (lastEventTimestamp - parsedJob.starttime) / MillisInSecond; JSONObject otherinfo = (JSONObject) job.get("otherinfo"); if (otherinfo.get("QUERY") != null) { // workaround for HIVE-10829 http://git-wip-us.apache.org/repos/asf/ambari/blob/e88ca22c/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegate.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegate.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegate.java index 02091f8..6de5773 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegate.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegate.java @@ -31,7 +31,7 @@ public interface ATSRequestsDelegate { String tezVerticesListForDAGUrl(String dagId); - JSONObject hiveQueryIdList(String username); + JSONObject hiveQueryIdsForUser(String username); JSONObject hiveQueryIdByOperationId(String operationId); @@ -40,4 +40,8 @@ public interface ATSRequestsDelegate { JSONObject tezVerticesListForDAG(String dagId); JSONObject tezDagByEntity(String entity); + + JSONObject hiveQueryIdsForUserByTime(String username, long startTime, long endTime); + + JSONObject hiveQueryEntityByEntityId(String hiveEntityId); } http://git-wip-us.apache.org/repos/asf/ambari/blob/e88ca22c/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegateImpl.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegateImpl.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegateImpl.java index 471645d..f52c9da 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegateImpl.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegateImpl.java @@ -31,7 +31,7 @@ import java.util.HashMap; public class ATSRequestsDelegateImpl implements ATSRequestsDelegate { protected final static Logger LOG = - LoggerFactory.getLogger(ATSRequestsDelegateImpl.class); + LoggerFactory.getLogger(ATSRequestsDelegateImpl.class); public static final String EMPTY_ENTITIES_JSON = "{ \"entities\" : [ ] }"; private ViewContext context; @@ -76,7 +76,7 @@ public class ATSRequestsDelegateImpl implements ATSRequestsDelegate { } @Override - public JSONObject hiveQueryIdList(String username) { + public JSONObject hiveQueryIdsForUser(String username) { String hiveQueriesListUrl = atsUrl + "/ws/v1/timeline/HIVE_QUERY_ID?primaryFilter=requestuser:" + username; String response = readFromWithDefault(hiveQueriesListUrl, "{ \"entities\" : [ ] }"); return (JSONObject) JSONValue.parse(response); @@ -85,7 +85,7 @@ public class ATSRequestsDelegateImpl implements ATSRequestsDelegate { @Override public JSONObject hiveQueryIdByOperationId(String operationId) { String hiveQueriesListUrl = hiveQueryIdOperationIdUrl(operationId); - String response = readFromWithDefault(hiveQueriesListUrl, "{ \"entities\" : [ ] }"); + String response = readFromWithDefault(hiveQueriesListUrl, EMPTY_ENTITIES_JSON); return (JSONObject) JSONValue.parse(response); } @@ -103,6 +103,35 @@ public class ATSRequestsDelegateImpl implements ATSRequestsDelegate { return (JSONObject) JSONValue.parse(response); } + /** + * fetches the HIVE_QUERY_ID from ATS for given user between given time period + * @param username: username for which to fetch hive query IDs + * @param startTime: time in miliseconds, inclusive + * @param endTime: time in miliseconds, exclusive + * @return + */ + @Override + public JSONObject hiveQueryIdsForUserByTime(String username, long startTime, long endTime) { + StringBuilder url = new StringBuilder(); + url.append(atsUrl).append("/ws/v1/timeline/HIVE_QUERY_ID?") + .append("windowStart=").append(startTime) + .append("&windowEnd=").append(endTime) + .append("&primaryFilter=requestuser:").append(username); + String hiveQueriesListUrl = url.toString(); + + String response = readFromWithDefault(hiveQueriesListUrl, EMPTY_ENTITIES_JSON); + return (JSONObject) JSONValue.parse(response); + } + + @Override + public JSONObject hiveQueryEntityByEntityId(String hiveEntityId) { + StringBuilder url = new StringBuilder(); + url.append(atsUrl).append("/ws/v1/timeline/HIVE_QUERY_ID/").append(hiveEntityId); + String hiveQueriesListUrl = url.toString(); + String response = readFromWithDefault(hiveQueriesListUrl, EMPTY_ENTITIES_JSON); + return (JSONObject) JSONValue.parse(response); + } + private String tezDagEntityUrl(String entity) { return atsUrl + "/ws/v1/timeline/TEZ_DAG_ID?primaryFilter=callerId:" + entity; } http://git-wip-us.apache.org/repos/asf/ambari/blob/e88ca22c/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/IATSParser.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/IATSParser.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/IATSParser.java index f51b880..547dfec 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/IATSParser.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/IATSParser.java @@ -21,7 +21,7 @@ package org.apache.ambari.view.hive.resources.jobs.atsJobs; import java.util.List; public interface IATSParser { - List<HiveQueryId> getHiveQueryIdsList(String username); + List<HiveQueryId> getHiveQueryIdsForUser(String username); List<TezVertexId> getVerticesForDAGId(String dagId); @@ -30,4 +30,10 @@ public interface IATSParser { TezDagId getTezDAGByName(String name); TezDagId getTezDAGByEntity(String entity); + + List<HiveQueryId> getHiveQueryIdsForUserByTime(String username, long startTime, long endTime); + + HiveQueryId getHiveQueryIdByHiveEntityId(String hiveEntityId); + + List<HiveQueryId> getHiveQueryIdByEntityList(List<String> hiveEntityIds); } http://git-wip-us.apache.org/repos/asf/ambari/blob/e88ca22c/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/Job.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/Job.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/Job.java index 9afb21a..98d6589 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/Job.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/Job.java @@ -22,20 +22,27 @@ package org.apache.ambari.view.hive.resources.jobs.viewJobs; import org.apache.ambari.view.hive.persistence.utils.Indexed; import org.apache.ambari.view.hive.persistence.utils.PersonalResource; +import java.beans.Transient; import java.io.Serializable; /** * Interface for Job bean to create Proxy for it */ public interface Job extends Serializable,Indexed,PersonalResource { - public static final String JOB_STATE_UNKNOWN = "Unknown"; - public static final String JOB_STATE_INITIALIZED = "Initialized"; - public static final String JOB_STATE_RUNNING = "Running"; - public static final String JOB_STATE_FINISHED = "Succeeded"; - public static final String JOB_STATE_CANCELED = "Canceled"; - public static final String JOB_STATE_CLOSED = "Closed"; - public static final String JOB_STATE_ERROR = "Error"; - public static final String JOB_STATE_PENDING = "Pending"; + String JOB_STATE_UNKNOWN = "Unknown"; + String JOB_STATE_INITIALIZED = "Initialized"; + String JOB_STATE_RUNNING = "Running"; + String JOB_STATE_FINISHED = "Succeeded"; + String JOB_STATE_CANCELED = "Canceled"; + String JOB_STATE_CLOSED = "Closed"; + String JOB_STATE_ERROR = "Error"; + String JOB_STATE_PENDING = "Pending"; + + @Transient + String getHiveQueryId(); + + @Transient + void setHiveQueryId(String hiveQueryId); String getId(); http://git-wip-us.apache.org/repos/asf/ambari/blob/e88ca22c/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobControllerImpl.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobControllerImpl.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobControllerImpl.java index f6d6ed6..a408619 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobControllerImpl.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobControllerImpl.java @@ -19,13 +19,27 @@ package org.apache.ambari.view.hive.resources.jobs.viewJobs; import org.apache.ambari.view.ViewContext; -import org.apache.ambari.view.hive.client.*; +import org.apache.ambari.view.hive.client.Cursor; +import org.apache.ambari.view.hive.client.HiveClientException; +import org.apache.ambari.view.hive.client.HiveClientRuntimeException; +import org.apache.ambari.view.hive.client.HiveErrorStatusException; +import org.apache.ambari.view.hive.client.UserLocalConnection; import org.apache.ambari.view.hive.persistence.utils.ItemNotFound; -import org.apache.ambari.view.hive.resources.jobs.*; +import org.apache.ambari.view.hive.resources.jobs.ConnectionController; +import org.apache.ambari.view.hive.resources.jobs.LogParser; +import org.apache.ambari.view.hive.resources.jobs.ModifyNotificationDelegate; +import org.apache.ambari.view.hive.resources.jobs.ModifyNotificationInvocationHandler; +import org.apache.ambari.view.hive.resources.jobs.NoOperationStatusSetException; +import org.apache.ambari.view.hive.resources.jobs.OperationHandleController; +import org.apache.ambari.view.hive.resources.jobs.OperationHandleControllerFactory; import org.apache.ambari.view.hive.resources.jobs.atsJobs.IATSParser; import org.apache.ambari.view.hive.resources.savedQueries.SavedQuery; import org.apache.ambari.view.hive.resources.savedQueries.SavedQueryResourceManager; -import org.apache.ambari.view.hive.utils.*; +import org.apache.ambari.view.hive.utils.BadRequestFormattedException; +import org.apache.ambari.view.hive.utils.FilePaginator; +import org.apache.ambari.view.hive.utils.HiveClientFormattedException; +import org.apache.ambari.view.hive.utils.MisconfigurationFormattedException; +import org.apache.ambari.view.hive.utils.ServiceFormattedException; import org.apache.ambari.view.utils.hdfs.HdfsApi; import org.apache.ambari.view.utils.hdfs.HdfsApiException; import org.apache.ambari.view.utils.hdfs.HdfsUtil; @@ -36,7 +50,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.lang.reflect.Proxy; import java.text.SimpleDateFormat; -import java.util.*; +import java.util.Date; public class JobControllerImpl implements JobController, ModifyNotificationDelegate { private final static Logger LOG = @@ -268,15 +282,13 @@ public class JobControllerImpl implements JobController, ModifyNotificationDeleg private static final long MillisInSecond = 1000L; public void updateJobDuration() { - job.setDuration(System.currentTimeMillis() / MillisInSecond - job.getDateSubmitted()); + job.setDuration((System.currentTimeMillis() / MillisInSecond) - (job.getDateSubmitted() / MillisInSecond)); } public void setCreationDate() { - job.setDateSubmitted(System.currentTimeMillis() / MillisInSecond); + job.setDateSubmitted(System.currentTimeMillis()); } - - private void setupLogFile() { LOG.debug("Creating log file for job#" + job.getId()); http://git-wip-us.apache.org/repos/asf/ambari/blob/e88ca22c/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobImpl.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobImpl.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobImpl.java index 437c9ba..2e5f0f7 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobImpl.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobImpl.java @@ -55,6 +55,8 @@ public class JobImpl implements Job { private String logFile; private String confFile; + private String hiveQueryId; + public JobImpl() {} public JobImpl(Map<String, Object> stringObjectMap) throws InvocationTargetException, IllegalAccessException { for (Map.Entry<String, Object> entry : stringObjectMap.entrySet()) { @@ -73,9 +75,8 @@ public class JobImpl implements Job { JobImpl job = (JobImpl) o; - if (id != null ? !id.equals(job.id) : job.id != null) return false; + return id != null ? id.equals(job.id) : job.id == null; - return true; } @Override @@ -84,6 +85,18 @@ public class JobImpl implements Job { } @Override + @Transient + public String getHiveQueryId() { + return hiveQueryId; + } + + @Override + @Transient + public void setHiveQueryId(String hiveQueryId) { + this.hiveQueryId = hiveQueryId; + } + + @Override public String getId() { return id; } http://git-wip-us.apache.org/repos/asf/ambari/blob/e88ca22c/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobInfo.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobInfo.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobInfo.java new file mode 100644 index 0000000..4162594 --- /dev/null +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobInfo.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive.resources.jobs.viewJobs; + +public class JobInfo { + private String jobId; + private String hiveId; + private String dagId; + private String operationId; + + public JobInfo() { + } + + public JobInfo(String jobId, String hiveId, String dagId, String operationId) { + this.jobId = jobId; + this.hiveId = hiveId; + this.dagId = dagId; + this.operationId = operationId; + } + + public String getJobId() { + return jobId; + } + + public void setJobId(String jobId) { + this.jobId = jobId; + } + + public String getHiveId() { + return hiveId; + } + + public void setHiveId(String hiveId) { + this.hiveId = hiveId; + } + + public String getDagId() { + return dagId; + } + + public void setDagId(String dagId) { + this.dagId = dagId; + } + + public String getOperationId() { + return operationId; + } + + public void setOperationId(String operationId) { + this.operationId = operationId; + } + + @Override + public String toString() { + return new StringBuilder().append("JobInfo{" ) + .append("jobId=").append(jobId) + .append(", hiveId=").append(hiveId) + .append(", dagId=").append(dagId) + .append(", operationId=").append(operationId) + .append('}').toString(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e88ca22c/contrib/views/hive/src/main/resources/ui/hive-web/app/components/number-range-widget.js ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/resources/ui/hive-web/app/components/number-range-widget.js b/contrib/views/hive/src/main/resources/ui/hive-web/app/components/number-range-widget.js index 5d62b59..3b340ad 100644 --- a/contrib/views/hive/src/main/resources/ui/hive-web/app/components/number-range-widget.js +++ b/contrib/views/hive/src/main/resources/ui/hive-web/app/components/number-range-widget.js @@ -54,17 +54,26 @@ export default Ember.Component.extend({ numberRange.set('fromDuration', utils.secondsToHHMMSS(numberRange.get('from'))); numberRange.set('toDuration', utils.secondsToHHMMSS(numberRange.get('to'))); }, - updateMin: function () { + updateFrom: function () { if (this.get('rendered')) { this.$('.slider').slider('values', 0, this.get('numberRange.from')); this.updateRangeLables(); } }.observes('numberRange.from'), - updateMax: function () { + updateTo: function () { if (this.get('rendered')) { this.$('.slider').slider('values', 1, this.get('numberRange.to')); this.updateRangeLables(); } - }.observes('numberRange.to') + }.observes('numberRange.to'), + + updateMin: function(){ + this.$( ".slider" ).slider( "option", "min", this.get('numberRange.min') ); + }.observes('numberRange.min'), + + updateMax: function(){ + this.$( ".slider" ).slider( "option", "max", this.get('numberRange.max') ); + }.observes('numberRange.max') + }); http://git-wip-us.apache.org/repos/asf/ambari/blob/e88ca22c/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/history.js ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/history.js b/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/history.js index 8c4ed2f..ca6233c 100644 --- a/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/history.js +++ b/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/history.js @@ -23,50 +23,87 @@ import constants from 'hive/utils/constants'; export default Ember.ArrayController.extend(FilterableMixin, { jobService: Ember.inject.service('job'), fileService: Ember.inject.service('file'), - + historyService: Ember.inject.service('history'), + NUM_OF_DAYS: 5, + REFRESH_INTERVAL_SEC: 30000, sortAscending: false, sortProperties: ['dateSubmittedTimestamp'], + refresher: function () { + var self = this; + Ember.run.later(function () { + if (self.get('isShowing')) { + self.refresh(); + } + self.refresher(); + }, self.get('REFRESH_INTERVAL_SEC')); + }, + onLoadRoute: function () { + this.set('isShowing', true); + }, + onUnloadRoute: function () { + this.set('isShowing', false); + }, init: function () { - var oneMonthAgo = new Date(); - oneMonthAgo.setMonth(oneMonthAgo.getMonth() - 1); - this._super(); + var self = this; + var fromTime = moment().subtract(this.get('NUM_OF_DAYS'), 'days').startOf('day'); + var time = moment(); + var toTime = moment({ + years: time.year(), + months: time.month(), + date: time.date(), + hours: 23, + minutes: 59, + seconds: 59, + milliseconds: 999 + }); // next 12AM - this.set('columns', Ember.ArrayProxy.create({ content: Ember.A([ - Ember.Object.create({ - caption: 'columns.title', - property: 'title', - link: constants.namingConventions.subroutes.historyQuery - }), - Ember.Object.create({ - caption: 'columns.status', - property: 'status' - }), - Ember.Object.create({ - caption: 'columns.date', - property: 'dateSubmittedTimestamp', - dateRange: Ember.Object.create({ - min: oneMonthAgo, - max: new Date() + this.set('columns', Ember.ArrayProxy.create({ + content: Ember.A([ + Ember.Object.create({ + caption: 'columns.title', + property: 'title', + link: constants.namingConventions.subroutes.historyQuery + }), + Ember.Object.create({ + caption: 'columns.status', + property: 'status' + }), + Ember.Object.create({ + caption: 'columns.date', + property: 'dateSubmittedTimestamp', + dateRange: Ember.Object.create({ + min: fromTime.toDate(), + max: toTime.toDate() + }) + }), + Ember.Object.create({ + caption: 'columns.duration', + property: 'duration', + numberRange: Ember.Object.create({ + min: 0, + max: 10, + units: 'sec' + }) }) - }), - Ember.Object.create({ - caption: 'columns.duration', - property: 'duration', - numberRange: Ember.Object.create({ - min: 0, - max: 10, - units: 'sec' - }) - }) - ])})); - }, - - model: function () { - return this.filter(this.get('history')); - }.property('history', 'filters.@each'), + ]) + })); + return this.updateJobs(fromTime, toTime).then(function (data) { + self.applyDurationFilter(); + self.refresher(); + }); + }, + applyDurationFilter: function () { + var self = this; + var durationColumn = this.get('columns').find(function (column) { + return column.get('caption') === 'columns.duration'; + }); + var from = durationColumn.get('numberRange.from'); + var to = durationColumn.get('numberRange.to'); + self.filterBy("duration", {min: from, max: to}); + }, updateIntervals: function () { var durationColumn; var maxDuration; @@ -86,51 +123,101 @@ export default Ember.ArrayController.extend(FilterableMixin, { durationColumn.set('numberRange.min', minDuration); durationColumn.set('numberRange.max', maxDuration); + var from = durationColumn.get('numberRange.from'); + var to = durationColumn.get('numberRange.to'); + if (from > maxDuration) { + durationColumn.set("numberRange.from", maxDuration); + } + if (to < minDuration) { + durationColumn.set("numberRange.to", minDuration); + } } }.observes('history'), - updateDateRange: function () { - var dateColumn; - var maxDate; - var minDate; - - if (this.get('columns')) { - dateColumn = this.get('columns').find(function (column) { - return column.get('caption') === 'columns.date'; - }); - - var items = this.get('history').map(function (item) { - return item.get(dateColumn.get('property')); - }); - - minDate = items.length ? Math.min.apply(Math, items) : new Date(); - maxDate = items.length ? Math.max.apply(Math, items) : new Date(); + model: function () { + return this.filter(this.get('history')); + }.property('history', 'filters.@each'), - dateColumn.set('dateRange.min', minDate); - dateColumn.set('dateRange.max', maxDate); - } - }.observes('history'), + updateJobs: function (fromDate, toDate) { + var self = this; + var fromTime = moment(fromDate).startOf('day').toDate().getTime(); + var time = moment(toDate); + var toTime = moment({ + years: time.year(), + months: time.month(), + date: time.date(), + hours: 23, + minutes: 59, + seconds: 59, + milliseconds: 999 + }).toDate().getTime(); // next 12AM + this.set("fromTime", fromTime); + this.set("toTime", toTime); + return this.get("historyService").getJobs(fromTime, toTime).then(function (data) { + self.set('history', data); + }); + }, filterBy: function (filterProperty, filterValue, exactMatch) { var column = this.get('columns').find(function (column) { return column.get('property') === filterProperty; }); + var isDateColumn = column.get('caption') === 'columns.date'; + if (column) { column.set('filterValue', filterValue, exactMatch); + if (isDateColumn) { + + return this.updateJobs(filterValue.min, filterValue.max); + } else { + this.updateFilters(filterProperty, filterValue, exactMatch); + } } else { this.updateFilters(filterProperty, filterValue, exactMatch); } }, + refresh: function () { + var self = this; + this.get('historyService').getUpdatedJobList(this.get('toTime')).then(function (data) { + self.set('history', data); + }); + }, + actions: { + + refreshJobs: function () { + this.refresh(); + }, + + filterUpdated: function (filterProperty, filterValue) { + var self = this; + var column = this.get('columns').find(function (column) { + return column.get('property') === filterProperty; + }); + + var isDateColumn = (column.get('caption') === 'columns.date'); + + if (column) { + column.set('filterValue', filterValue); + if (isDateColumn) { + return this.updateJobs(filterValue.min, filterValue.max).then(function (data) { + self.updateFilters(filterProperty, filterValue); + }); + } else { + self.updateFilters(filterProperty, filterValue); + } + } + }, + sort: function (property) { //if same column has been selected, toggle flag, else default it to true if (this.get('sortProperties').objectAt(0) === property) { this.set('sortAscending', !this.get('sortAscending')); } else { this.set('sortAscending', true); - this.set('sortProperties', [ property ]); + this.set('sortProperties', [property]); } }, http://git-wip-us.apache.org/repos/asf/ambari/blob/e88ca22c/contrib/views/hive/src/main/resources/ui/hive-web/app/initializers/i18n.js ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/resources/ui/hive-web/app/initializers/i18n.js b/contrib/views/hive/src/main/resources/ui/hive-web/app/initializers/i18n.js index c8bb7d0..a00f0b4 100644 --- a/contrib/views/hive/src/main/resources/ui/hive-web/app/initializers/i18n.js +++ b/contrib/views/hive/src/main/resources/ui/hive-web/app/initializers/i18n.js @@ -197,6 +197,7 @@ TRANSLATIONS = { stoppingJob: 'Stopping...', close: 'Close', clearFilters: 'Clear filters', + refresh: 'Refresh', expand: 'Expand message', collapse: 'Collapse message', previousPage: 'previous', @@ -214,7 +215,7 @@ TRANSLATIONS = { noTablesMatch: 'No tables match', noColumnsMatch: 'No columns match', table: 'Table ', - hoursShort: "{{hours}} hrs", + hrsShort: "{{hours}} hrs", minsShort: "{{minutes}} mins", secsShort: "{{seconds}} secs" }, http://git-wip-us.apache.org/repos/asf/ambari/blob/e88ca22c/contrib/views/hive/src/main/resources/ui/hive-web/app/models/job.js ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/resources/ui/hive-web/app/models/job.js b/contrib/views/hive/src/main/resources/ui/hive-web/app/models/job.js index 9079b5a..185f512 100644 --- a/contrib/views/hive/src/main/resources/ui/hive-web/app/models/job.js +++ b/contrib/views/hive/src/main/resources/ui/hive-web/app/models/job.js @@ -21,6 +21,7 @@ import DS from 'ember-data'; export default DS.Model.extend({ title: DS.attr('string'), queryId: DS.attr(), + hiveQueryId: DS.attr('string'), queryFile: DS.attr('string'), owner: DS.attr('string'), dataBase: DS.attr('string'), @@ -43,7 +44,7 @@ export default DS.Model.extend({ dateSubmittedTimestamp: function () { var date = this.get('dateSubmitted'); - return date ? date * 1000 : date; + return date; // ? date * 1000 : date; now dateSubmitted itself is in miliseconds. so conversion not required. }.property('dateSubmitted'), uppercaseStatus: function () { http://git-wip-us.apache.org/repos/asf/ambari/blob/e88ca22c/contrib/views/hive/src/main/resources/ui/hive-web/app/routes/history.js ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/resources/ui/hive-web/app/routes/history.js b/contrib/views/hive/src/main/resources/ui/hive-web/app/routes/history.js index 0aa3d41..e9fcf88 100644 --- a/contrib/views/hive/src/main/resources/ui/hive-web/app/routes/history.js +++ b/contrib/views/hive/src/main/resources/ui/hive-web/app/routes/history.js @@ -17,23 +17,13 @@ */ import Ember from 'ember'; -import constants from 'hive/utils/constants'; export default Ember.Route.extend({ - notifyService: Ember.inject.service(constants.namingConventions.notify), - - model: function () { - var self = this; - - return this.store.find(constants.namingConventions.job).catch(function (error) { - self.get('notifyService').error(error); - }); + deactivate: function () { + this.controller.onUnloadRoute(); }, setupController: function (controller, model) { - if (!model) { - return; - } - controller.set('history', model); + this.controller.onLoadRoute(); } }); http://git-wip-us.apache.org/repos/asf/ambari/blob/e88ca22c/contrib/views/hive/src/main/resources/ui/hive-web/app/services/history.js ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/resources/ui/hive-web/app/services/history.js b/contrib/views/hive/src/main/resources/ui/hive-web/app/services/history.js new file mode 100644 index 0000000..4998d19 --- /dev/null +++ b/contrib/views/hive/src/main/resources/ui/hive-web/app/services/history.js @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Ember from 'ember'; +import Job from 'hive/models/job' +import constants from 'hive/utils/constants'; + +export default Ember.Service.extend({ + historyJobsMap: {}, + store: Ember.inject.service(), + fromDate: null, + toDate: null, + + getJobs: function (fromDate, toDate) { + var self = this; + console.log("getJobs : fromDate : ", fromDate, ", toDate : ", toDate); + + if (Ember.isEmpty(fromDate) || Ember.isEmpty(toDate)) { + throw new Error("Dates cannot be empty."); + } + if (toDate < fromDate) { + throw new Error("toDate cannot be smaller than fromDate"); + } + + var currFromDate = this.get("fromDate"); + var currToDate = this.get("toDate"); + var currJobsMap = this.get("historyJobsMap"); + + if (!Ember.isEmpty(currFromDate) && !Ember.isEmpty(currToDate) + && currFromDate <= fromDate && currToDate >= toDate + && !Ember.isEmpty(currJobsMap) + ) { + // filter current jobs and return + var validJobs = []; + Object.keys(currJobsMap).forEach(function (id) { + var job = currJobsMap[id]; + if (job.get('dateSubmitted') >= fromDate && job.get('dateSubmitted') < toDate) { + validJobs.push(job); + } + }); + + return Ember.RSVP.Promise.resolve(validJobs); + } + + return this.fetchJobs(fromDate, toDate).then(function (data) { + var jobMap = {}; + var jobs = data.map(function (j) { + var job = this.get('store').push('job', j); + jobMap[job.id] = job; + return job; + }, self); + self.set('fromDate', fromDate); + self.set('toDate', toDate); + self.set('historyJobsMap', jobMap); + return jobs; + }); + }, + + fetchJobs: function (fromDate, toDate) { + console.log("getJobs : fromDate : ", fromDate, ", toDate : ", toDate); + + if (Ember.isEmpty(fromDate) || Ember.isEmpty(toDate)) { + throw new Error("Dates cannot be empty."); + } + if (toDate < fromDate) { + throw new Error("toDate cannot be smaller than fromDate"); + } + + var self = this; + var url = this.container.lookup('adapter:application').buildURL(); + url += "/jobs"; + var jobMap = {}; + return Ember.$.ajax({ + url: url, + type: 'GET', + data: { + "startTime": fromDate, + "endTime": toDate + }, + headers: { + 'X-Requested-By': 'ambari' + } + }); + }, + + fetchAndMergeNew: function (toTime) { + var self = this; + return this.fetchNew(toTime).then(function (data) { + var jobMap = self.get('historyJobsMap'); + var jobs = data.map(function (j) { + var job = this.get('store').push('job', j); + jobMap[job.id] = job; + return job; + }, self); + self.set('toDate', toTime); + return jobs; + }); + }, + + getUpdatedJobList: function (toTime) { + var self = this; + return this.refreshAndFetchNew(toTime).then(function (data) { + var jobMap = self.get('historyJobsMap'); + var allJobs = Object.keys(jobMap).map(function (id) { + return jobMap[id]; + }); + return allJobs; + }); + }, + + fetchNew: function (toTime) { + var self = this; + var jobMap = this.get('historyJobsMap'); + var fromTime = 0; + if (this.get('fromDate')) { + fromTime = this.get('fromDate'); + } + + Object.keys(jobMap).forEach(function (id) { + var job = jobMap[id]; + fromTime = Math.max(fromTime, job.get('dateSubmitted')); + }); + + if (fromTime > toTime) { + // we already have latest data. + return Ember.RSVP.Promise.resolve([]); + } + return this.fetchJobs(fromTime, toTime); + }, + + refresh: function () { + var self = this; + var url = this.container.lookup('adapter:application').buildURL(); + url += "/jobs/getList"; + var jobMap = this.get('historyJobsMap'); + var statuses = constants.statuses; + var jobIds = []; + Object.keys(jobMap).forEach(function (id) { + var job = jobMap[id]; + var jobStatus = job.get('uppercaseStatus'); + if (jobStatus === statuses.initialized + || jobStatus === statuses.pending + || jobStatus === statuses.running + || jobStatus === statuses.unknown + ) { + // note jobId will either have DB's id or hiveId + jobIds.push({ + jobId: job.get('id'), + hiveId: job.get('hiveQueryId'), + dagId: job.get('dagId'), + operationId: job.get('operationId') + }); + } + }); + + if (Ember.isEmpty(jobIds)) { + return Ember.RSVP.Promise.resolve([]); + } + console.log("refresh jobIds to refresh : ", jobIds); + return Ember.$.ajax({ + url: url, + type: 'POST', + data: JSON.stringify(jobIds), + headers: { + 'X-Requested-By': 'ambari' + }, + contentType: "application/json" + }).then(function (data) { + var jobs = data.map(function (j) { + var job = this.get('store').push('job', j); + jobMap[job.id] = job; + return job; + }, self); + self.set('historyJobsMap', jobMap); + // return all the jobs + var allJobs = Object.keys(jobMap).map(function (id) { + return jobMap[id]; + }); + return allJobs; + }); + }, + + refreshAndFetchNew: function (toTime) { + var self = this; + return this.refresh().then(function (data) { + return self.fetchAndMergeNew(toTime); + }) + } +}); http://git-wip-us.apache.org/repos/asf/ambari/blob/e88ca22c/contrib/views/hive/src/main/resources/ui/hive-web/app/templates/history.hbs ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/resources/ui/hive-web/app/templates/history.hbs b/contrib/views/hive/src/main/resources/ui/hive-web/app/templates/history.hbs index 052498e..7121b85 100644 --- a/contrib/views/hive/src/main/resources/ui/hive-web/app/templates/history.hbs +++ b/contrib/views/hive/src/main/resources/ui/hive-web/app/templates/history.hbs @@ -15,49 +15,53 @@ * See the License for the specific language governing permissions and * limitations under the License. }} - <table class="table table-expandable"> - <thead> + <thead> <tr> - {{#each column in columns}} + {{#each column in columns}} <th> - {{#if column.caption}} + {{#if column.caption}} {{column-filter-widget class="pull-left" - column=column - filterValue=column.filterValue - sortAscending=controller.sortAscending - sortProperties=controller.sortProperties - columnSorted="sort" - columnFiltered="filter"}} - {{else}} + column=column + filterValue=column.filterValue + sortAscending=controller.sortAscending + sortProperties=controller.sortProperties + columnSorted="sort" + columnFiltered="filterUpdated"}} + {{else}} {{tb-helper "caption" column}} - {{/if}} + {{/if}} + </th> + {{/each}} + <th> + <button type="btn" class="btn btn-primary btn-sm icon-refresh" {{action + "refreshJobs"}}><i class="fa fa-refresh" aria-hidden="true"></i> + {{t "buttons.refresh"}}</button> + + <button type="btn" class="btn btn-sm btn-warning pull-right clear-filters" {{action + "clearFilters"}}>{{t "buttons.clearFilters"}}</button> </th> - {{/each}} - <th> - <button type="btn" class="btn btn-sm btn-warning pull-right clear-filters" {{action "clearFilters"}}>{{t "buttons.clearFilters"}}</button> - </th> </tr> - </thead> - <tbody> + </thead> + <tbody> {{#if history.length}} - {{#if model.length}} - {{#each item in this}} - {{job-tr-view job=item onStopJob="interruptJob" onFileRequested="loadFile"}} - {{/each}} - {{else}} - <tr> - <td colspan="5"> + {{#if model.length}} + {{#each item in this}} + {{job-tr-view job=item onStopJob="interruptJob" onFileRequested="loadFile"}} + {{/each}} + {{else}} + <tr> + <td colspan="5"> <h4 class="empty-list">{{t "emptyList.history.noMatches"}}</h4> - </td> - </tr> - {{/if}} + </td> + </tr> + {{/if}} {{else}} - <tr> + <tr> <td colspan="5"> - <h4 class="empty-list">{{t "emptyList.history.noItems"}}</h4> + <h4 class="empty-list">{{t "emptyList.history.noItems"}}</h4> </td> - </tr> + </tr> {{/if}} - </tbody> + </tbody> </table> http://git-wip-us.apache.org/repos/asf/ambari/blob/e88ca22c/contrib/views/hive/src/main/resources/ui/hive-web/app/utils/constants.js ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/resources/ui/hive-web/app/utils/constants.js b/contrib/views/hive/src/main/resources/ui/hive-web/app/utils/constants.js index 4b9899a..ec8352b 100644 --- a/contrib/views/hive/src/main/resources/ui/hive-web/app/utils/constants.js +++ b/contrib/views/hive/src/main/resources/ui/hive-web/app/utils/constants.js @@ -61,13 +61,13 @@ export default Ember.Object.create({ insertUdfs: 'insert-udfs', job: 'job', jobs: 'jobs', - history: 'history', savedQuery: 'saved-query', database: 'database', databases: 'databases', openQueries: 'open-queries', visualExplain: 'visual-explain', notify: 'notify', + history: 'history', tezUI: 'tez-ui', file: 'file', fileResource: 'file-resource', http://git-wip-us.apache.org/repos/asf/ambari/blob/e88ca22c/contrib/views/hive/src/test/java/org/apache/ambari/view/hive/resources/jobs/ATSParserTest.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/test/java/org/apache/ambari/view/hive/resources/jobs/ATSParserTest.java b/contrib/views/hive/src/test/java/org/apache/ambari/view/hive/resources/jobs/ATSParserTest.java index d8e60c4..43b0b65 100644 --- a/contrib/views/hive/src/test/java/org/apache/ambari/view/hive/resources/jobs/ATSParserTest.java +++ b/contrib/views/hive/src/test/java/org/apache/ambari/view/hive/resources/jobs/ATSParserTest.java @@ -18,7 +18,11 @@ package org.apache.ambari.view.hive.resources.jobs; -import org.apache.ambari.view.hive.resources.jobs.atsJobs.*; +import org.apache.ambari.view.hive.resources.jobs.atsJobs.ATSParser; +import org.apache.ambari.view.hive.resources.jobs.atsJobs.ATSRequestsDelegate; +import org.apache.ambari.view.hive.resources.jobs.atsJobs.HiveQueryId; +import org.apache.ambari.view.hive.resources.jobs.atsJobs.IATSParser; +import org.apache.ambari.view.hive.resources.jobs.atsJobs.TezDagId; import org.apache.commons.codec.binary.Base64; import org.json.simple.JSONObject; import org.json.simple.JSONValue; @@ -39,15 +43,15 @@ public class ATSParserTest { public void testGetHiveJobsList() throws Exception { IATSParser jobLoader = new ATSParser(new ATSRequestsDelegateStub()); - List<HiveQueryId> jobs = jobLoader.getHiveQueryIdsList("hive"); + List<HiveQueryId> jobs = jobLoader.getHiveQueryIdsForUser("hive"); Assert.assertEquals(1, jobs.size()); HiveQueryId job = jobs.get(0); Assert.assertEquals("hive_20150209144848_c3a5a07b-c3b6-4f57-a6d5-3dadecdd6fd0", job.entity); - Assert.assertEquals(1423493324L, job.starttime); + Assert.assertEquals(1423493324355L, job.starttime); Assert.assertEquals("hive", job.user); - Assert.assertEquals(1423493342L - 1423493324L, job.duration); + Assert.assertEquals((1423493342843L - 1423493324355L) / 1000L, job.duration); Assert.assertEquals("select count(*) from z", job.query); Assert.assertEquals(1, job.dagNames.size()); @@ -57,7 +61,7 @@ public class ATSParserTest { Assert.assertTrue(HiveQueryId.ATS_15_RESPONSE_VERSION > job.version); jobLoader = new ATSParser(new ATSV15RequestsDelegateStub()); - List<HiveQueryId> jobsv2 = jobLoader.getHiveQueryIdsList("hive"); + List<HiveQueryId> jobsv2 = jobLoader.getHiveQueryIdsForUser("hive"); Assert.assertEquals(1, jobsv2.size()); HiveQueryId jobv2 = jobsv2.get(0); Assert.assertTrue(HiveQueryId.ATS_15_RESPONSE_VERSION <= jobv2.version); @@ -90,7 +94,7 @@ public class ATSParserTest { * This returns the version field that the ATS v1.5 returns. */ @Override - public JSONObject hiveQueryIdList(String username) { + public JSONObject hiveQueryIdsForUser(String username) { return (JSONObject) JSONValue.parse( "{ \"entities\" : [ { \"domain\" : \"DEFAULT\",\n" + " \"entity\" : \"hive_20150209144848_c3a5a07b-c3b6-4f57-a6d5-3dadecdd6fd0\",\n" + @@ -120,6 +124,16 @@ public class ATSParserTest { protected static class ATSRequestsDelegateStub implements ATSRequestsDelegate { + + public JSONObject hiveQueryIdsForUserByTime(String username, long startTime, long endTime) { + throw new NotImplementedException(); + } + + @Override + public JSONObject hiveQueryEntityByEntityId(String hiveEntityId) { + return null; + } + @Override public String hiveQueryIdDirectUrl(String entity) { return null; @@ -146,7 +160,7 @@ public class ATSParserTest { } @Override - public JSONObject hiveQueryIdList(String username) { + public JSONObject hiveQueryIdsForUser(String username) { return (JSONObject) JSONValue.parse( "{ \"entities\" : [ { \"domain\" : \"DEFAULT\",\n" + " \"entity\" : \"hive_20150209144848_c3a5a07b-c3b6-4f57-a6d5-3dadecdd6fd0\",\n" + http://git-wip-us.apache.org/repos/asf/ambari/blob/e88ca22c/contrib/views/hive/src/test/java/org/apache/ambari/view/hive/resources/jobs/AggregatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/test/java/org/apache/ambari/view/hive/resources/jobs/AggregatorTest.java b/contrib/views/hive/src/test/java/org/apache/ambari/view/hive/resources/jobs/AggregatorTest.java index e1f7c7c..91478e7 100644 --- a/contrib/views/hive/src/test/java/org/apache/ambari/view/hive/resources/jobs/AggregatorTest.java +++ b/contrib/views/hive/src/test/java/org/apache/ambari/view/hive/resources/jobs/AggregatorTest.java @@ -31,7 +31,11 @@ import org.apache.hive.service.cli.thrift.TOperationHandle; import org.junit.Assert; import org.junit.Test; -import java.util.*; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; public class AggregatorTest { @@ -60,7 +64,7 @@ public class AggregatorTest { @Test public void testReadJobWithHS2OutsideOfView() throws Exception { HiveQueryId hiveQueryId = getSampleHiveQueryId("ENTITY-NAME"); - ensureOperationIdSet(hiveQueryId); + ensureOperationIdUnset(hiveQueryId); MockATSParser atsParser = getMockATSWithQueries(hiveQueryId); Aggregator aggregator = new Aggregator(getEmptyJobResourceManager(), @@ -93,7 +97,7 @@ public class AggregatorTest { MockJobResourceManager jobResourceManager = getJobResourceManagerWithJobs(getSampleViewJob("1")); StoredOperationHandle operationHandle = getSampleOperationHandle("5", "1"); - MockOperationHandleResourceManager operationHandleResourceManager = getOperationHandleRMWithEntities(operationHandle); + MockOperationHandleResourceManager operationHandleResourceManager = getOperationHandleRMWithEntities(Arrays.asList(operationHandle), null); Aggregator aggregator = new Aggregator(jobResourceManager, operationHandleResourceManager, @@ -106,13 +110,22 @@ public class AggregatorTest { Assert.assertEquals("1", job.getId()); } - private MockOperationHandleResourceManager getOperationHandleRMWithEntities(StoredOperationHandle... operationHandles) { + private MockOperationHandleResourceManager getOperationHandleRMWithEntities(List<StoredOperationHandle> operationHandles, List<Job> jobs) { MockOperationHandleResourceManager operationHandleResourceManager = getEmptyOperationHandleResourceManager(); HashMap<String, StoredOperationHandle> storage = new HashMap<String, StoredOperationHandle>(); for (StoredOperationHandle handle : operationHandles) { storage.put(handle.getJobId(), handle); } + if (null != jobs) { + Iterator<Job> jobIterator = jobs.iterator(); + HashMap<String, Job> jobStorage = new HashMap<String, Job>(); + for (StoredOperationHandle handle : operationHandles) { + jobStorage.put(handle.getGuid(), jobIterator.next()); + operationHandleResourceManager.setJobStorage(jobStorage); + } + } operationHandleResourceManager.setStorage(storage); + return operationHandleResourceManager; } @@ -122,11 +135,12 @@ public class AggregatorTest { hiveQueryId.operationId = Aggregator.hexStringToUrlSafeBase64("1b2b"); MockATSParser atsParser = getMockATSWithQueries(hiveQueryId); - MockJobResourceManager jobResourceManager = getJobResourceManagerWithJobs(getSampleViewJob("1")); + Job job1 = getSampleViewJob("1"); + MockJobResourceManager jobResourceManager = getJobResourceManagerWithJobs(job1); StoredOperationHandle operationHandle = getSampleOperationHandle("5", "1"); operationHandle.setGuid("1b2b"); - MockOperationHandleResourceManager operationHandleResourceManager = getOperationHandleRMWithEntities(operationHandle); + MockOperationHandleResourceManager operationHandleResourceManager = getOperationHandleRMWithEntities(Arrays.asList(operationHandle), Arrays.asList(job1)); Aggregator aggregator = new Aggregator(jobResourceManager, operationHandleResourceManager, @@ -145,11 +159,12 @@ public class AggregatorTest { hiveQueryId.operationId = Aggregator.hexStringToUrlSafeBase64("1b2b"); MockATSParser atsParser = getMockATSWithQueries(hiveQueryId); - MockJobResourceManager jobResourceManager = getJobResourceManagerWithJobs(getSampleViewJob("1")); + Job job1 = getSampleViewJob("1"); + MockJobResourceManager jobResourceManager = getJobResourceManagerWithJobs(job1); StoredOperationHandle operationHandle = getSampleOperationHandle("5", "1"); operationHandle.setGuid("1b2b"); - MockOperationHandleResourceManager operationHandleResourceManager = getOperationHandleRMWithEntities(operationHandle); + MockOperationHandleResourceManager operationHandleResourceManager = getOperationHandleRMWithEntities(Arrays.asList(operationHandle), Arrays.asList(job1)); Aggregator aggregator = new Aggregator(jobResourceManager, operationHandleResourceManager, @@ -172,12 +187,14 @@ public class AggregatorTest { HiveQueryId hiveQueryId1 = getSampleHiveQueryId("ENTITY-NAME"); hiveQueryId1.operationId = Aggregator.hexStringToUrlSafeBase64("1a1b"); Job job1 = getSampleViewJob("1"); + Job job2 = getSampleViewJob("2"); StoredOperationHandle operationHandle1 = getSampleOperationHandle("5", "1"); operationHandle1.setGuid("1a1b"); - + StoredOperationHandle operationHandle2 = getSampleOperationHandle("5", "2"); + operationHandle2.setGuid("2a2b"); //job only on ATS HiveQueryId hiveQueryId2 = getSampleHiveQueryId("ENTITY-NAME2"); - hiveQueryId2.operationId = Aggregator.hexStringToUrlSafeBase64("2a2a"); + hiveQueryId2.operationId = Aggregator.hexStringToUrlSafeBase64("2a2b"); //job only in View Job job3 = getSampleViewJob("3"); @@ -189,8 +206,8 @@ public class AggregatorTest { hiveQueryId1, hiveQueryId2); MockJobResourceManager jobResourceManager = getJobResourceManagerWithJobs( job1, job3); - MockOperationHandleResourceManager operationHandleRM = getOperationHandleRMWithEntities( - operationHandle1, operationHandle3); + MockOperationHandleResourceManager operationHandleRM = getOperationHandleRMWithEntities(Arrays.asList( + operationHandle1, operationHandle2, operationHandle3), Arrays.asList(job1, job2, job3)); Aggregator aggregator = new Aggregator(jobResourceManager, operationHandleRM, @@ -327,6 +344,7 @@ public class AggregatorTest { public static class MockOperationHandleResourceManager implements IOperationHandleResourceManager { private HashMap<String, StoredOperationHandle> storage = new HashMap<String, StoredOperationHandle>(); + private HashMap<String, Job> jobStorage = new HashMap<>(); public MockOperationHandleResourceManager() { @@ -348,7 +366,7 @@ public class AggregatorTest { @Override public Job getJobByHandle(StoredOperationHandle handle) throws ItemNotFound { - throw new ItemNotFound(); + return jobStorage.get(handle.getGuid()); } @Override @@ -406,6 +424,14 @@ public class AggregatorTest { public void setStorage(HashMap<String, StoredOperationHandle> storage) { this.storage = storage; } + + public HashMap<String, Job> getJobStorage() { + return jobStorage; + } + + public void setJobStorage(HashMap<String, Job> jobStorage) { + this.jobStorage = jobStorage; + } } public static class MockATSParser implements IATSParser { @@ -416,7 +442,7 @@ public class AggregatorTest { } @Override - public List<HiveQueryId> getHiveQueryIdsList(String username) { + public List<HiveQueryId> getHiveQueryIdsForUser(String username) { return hiveQueryIds; } @@ -454,6 +480,21 @@ public class AggregatorTest { return dagId; } + @Override + public List<HiveQueryId> getHiveQueryIdsForUserByTime(String username, long startTime, long endTime) { + return null; + } + + @Override + public HiveQueryId getHiveQueryIdByHiveEntityId(String hiveEntityId) { + return null; + } + + @Override + public List<HiveQueryId> getHiveQueryIdByEntityList(List<String> hiveEntityIds) { + return null; + } + public List<HiveQueryId> getHiveQueryIds() { return hiveQueryIds; }
