http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/files/FileService.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/files/FileService.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/files/FileService.java new file mode 100644 index 0000000..a3623e9 --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/files/FileService.java @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.resources.files; + +import com.jayway.jsonpath.JsonPath; +import org.apache.ambari.view.ViewContext; +import org.apache.ambari.view.ViewResourceHandler; +import org.apache.ambari.view.commons.hdfs.UserService; +import org.apache.ambari.view.hive20.BaseService; +import org.apache.ambari.view.hive20.utils.*; +import org.apache.ambari.view.utils.hdfs.HdfsApi; +import org.apache.ambari.view.utils.hdfs.HdfsUtil; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.*; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriInfo; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.util.HashMap; + +/** + * File access resource + * API: + * GET /:path + * read entire file + * POST / + * create new file + * Required: filePath + * file should not already exists + * PUT /:path + * update file content + */ +public class FileService extends BaseService { + public static final String FAKE_FILE = "fakefile://"; + public static final String JSON_PATH_FILE = "jsonpath:"; + + @Inject + ViewResourceHandler handler; + + protected final static Logger LOG = + LoggerFactory.getLogger(FileService.class); + + /** + * Get single item + */ + @GET + @Path("{filePath:.*}") + @Produces(MediaType.APPLICATION_JSON) + public Response getFilePage(@PathParam("filePath") String filePath, @QueryParam("page") Long page) throws IOException, InterruptedException { + + LOG.debug("Reading file " + filePath); + try { + FileResource file = new FileResource(); + + if (page == null) + page = 0L; + + if (filePath.startsWith(FAKE_FILE)) { + if (page > 1) + throw new IllegalArgumentException("There's only one page in fake files"); + + String encodedContent = filePath.substring(FAKE_FILE.length()); + String content = new String(Base64.decodeBase64(encodedContent)); + + fillFakeFileObject(filePath, file, content); + } else if (filePath.startsWith(JSON_PATH_FILE)) { + if (page > 1) + throw new IllegalArgumentException("There's only one page in fake files"); + + String content = getJsonPathContentByUrl(filePath); + fillFakeFileObject(filePath, file, content); + } else { + + filePath = sanitizeFilePath(filePath); + FilePaginator paginator = new FilePaginator(filePath, getSharedObjectsFactory().getHdfsApi()); + + fillRealFileObject(filePath, page, file, paginator); + } + + JSONObject object = new JSONObject(); + object.put("file", file); + return Response.ok(object).status(200).build(); + } catch (WebApplicationException ex) { + throw ex; + } catch (FileNotFoundException ex) { + throw new NotFoundFormattedException(ex.getMessage(), ex); + } catch (IllegalArgumentException ex) { + throw new BadRequestFormattedException(ex.getMessage(), ex); + } catch (Exception ex) { + throw new ServiceFormattedException(ex.getMessage(), ex); + } + } + + protected String getJsonPathContentByUrl(String filePath) throws IOException { + URL url = new URL(filePath.substring(JSON_PATH_FILE.length())); + + InputStream responseInputStream = context.getURLStreamProvider().readFrom(url.toString(), "GET", + (String)null, new HashMap<String, String>()); + String response = IOUtils.toString(responseInputStream); + + for (String ref : url.getRef().split("!")) { + response = JsonPath.read(response, ref); + } + return response; + } + + public void fillRealFileObject(String filePath, Long page, FileResource file, FilePaginator paginator) throws IOException, InterruptedException { + file.setFilePath(filePath); + file.setFileContent(paginator.readPage(page)); + file.setHasNext(paginator.pageCount() > page + 1); + file.setPage(page); + file.setPageCount(paginator.pageCount()); + } + + public void fillFakeFileObject(String filePath, FileResource file, String content) { + file.setFilePath(filePath); + file.setFileContent(content); + file.setHasNext(false); + file.setPage(0); + file.setPageCount(1); + } + + /** + * Delete single item + */ + @DELETE + @Path("{filePath:.*}") + public Response deleteFile(@PathParam("filePath") String filePath) throws IOException, InterruptedException { + try { + filePath = sanitizeFilePath(filePath); + LOG.debug("Deleting file " + filePath); + if (getSharedObjectsFactory().getHdfsApi().delete(filePath, false)) { + return Response.status(204).build(); + } + throw new NotFoundFormattedException("FileSystem.delete returned false", null); + } catch (WebApplicationException ex) { + throw ex; + } catch (Exception ex) { + throw new ServiceFormattedException(ex.getMessage(), ex); + } + } + + /** + * Update item + */ + @PUT + @Path("{filePath:.*}") + @Consumes(MediaType.APPLICATION_JSON) + public Response updateFile(FileResourceRequest request, + @PathParam("filePath") String filePath) throws IOException, InterruptedException { + try { + filePath = sanitizeFilePath(filePath); + LOG.debug("Rewriting file " + filePath); + FSDataOutputStream output = getSharedObjectsFactory().getHdfsApi().create(filePath, true); + output.writeBytes(request.file.getFileContent()); + output.close(); + return Response.status(204).build(); + } catch (WebApplicationException ex) { + throw ex; + } catch (Exception ex) { + throw new ServiceFormattedException(ex.getMessage(), ex); + } + } + + /** + * Create script + */ + @POST + @Consumes(MediaType.APPLICATION_JSON) + public Response createFile(FileResourceRequest request, + @Context HttpServletResponse response, @Context UriInfo ui) + throws IOException, InterruptedException { + try { + LOG.debug("Creating file " + request.file.getFilePath()); + try { + FSDataOutputStream output = getSharedObjectsFactory().getHdfsApi().create(request.file.getFilePath(), false); + if (request.file.getFileContent() != null) { + output.writeBytes(request.file.getFileContent()); + } + output.close(); + } catch (FileAlreadyExistsException ex) { + throw new ServiceFormattedException("F020 File already exists", ex, 400); + } + response.setHeader("Location", + String.format("%s/%s", ui.getAbsolutePath().toString(), request.file.getFilePath())); + return Response.status(204).build(); + } catch (WebApplicationException ex) { + throw ex; + } catch (Exception ex) { + throw new ServiceFormattedException(ex.getMessage(), ex); + } + } + + /** + * Checks connection to HDFS + * @param context View Context + */ + public static void hdfsSmokeTest(ViewContext context) { + try { + HdfsApi api = HdfsUtil.connectToHDFSApi(context); + api.getStatus(); + } catch (WebApplicationException ex) { + throw ex; + } catch (Exception ex) { + throw new ServiceFormattedException(ex.getMessage(), ex); + } + } + + /** + * Checks connection to User HomeDirectory + * @param context View Context + */ + public static void userhomeSmokeTest(ViewContext context) { + try { + UserService userservice = new UserService(context); + userservice.homeDir(); + } catch (WebApplicationException ex) { + throw ex; + } catch (Exception ex) { + throw new ServiceFormattedException(ex.getMessage(), ex); + } + } + + /** + * Wrapper object for json mapping + */ + public static class FileResourceRequest { + public FileResource file; + } + + private String sanitizeFilePath(String filePath){ + if (!filePath.startsWith("/") && !filePath.startsWith(".")) { + filePath = "/" + filePath; // some servers strip double slashes in URL + } + return filePath; + } +}
http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/Aggregator.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/Aggregator.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/Aggregator.java new file mode 100644 index 0000000..c70585e --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/Aggregator.java @@ -0,0 +1,382 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.resources.jobs; + +import akka.actor.ActorRef; +import org.apache.ambari.view.hive20.actor.message.job.SaveDagInformation; +import org.apache.ambari.view.hive20.persistence.utils.FilteringStrategy; +import org.apache.ambari.view.hive20.persistence.utils.Indexed; +import org.apache.ambari.view.hive20.persistence.utils.ItemNotFound; +import org.apache.ambari.view.hive20.persistence.utils.OnlyOwnersFilteringStrategy; +import org.apache.ambari.view.hive20.resources.IResourceManager; +import org.apache.ambari.view.hive20.resources.files.FileService; +import org.apache.ambari.view.hive20.resources.jobs.atsJobs.HiveQueryId; +import org.apache.ambari.view.hive20.resources.jobs.atsJobs.IATSParser; +import org.apache.ambari.view.hive20.resources.jobs.atsJobs.TezDagId; +import org.apache.ambari.view.hive20.resources.jobs.viewJobs.Job; +import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobImpl; +import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobInfo; +import org.apache.commons.beanutils.PropertyUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; + +/** + * View Jobs and ATS Jobs aggregator. + * There are 4 options: + * 1) ATS ExecuteJob without operationId + * *Meaning*: executed outside of HS2 + * - ExecuteJob info only from ATS + * 2) ATS ExecuteJob with operationId + * a) Hive View ExecuteJob with same operationId is not present + * *Meaning*: executed with HS2 + * - ExecuteJob info only from ATS + * b) Hive View ExecuteJob with operationId is present (need to merge) + * *Meaning*: executed with HS2 through Hive View + * - ExecuteJob info merged from ATS and from Hive View DataStorage + * 3) ExecuteJob present only in Hive View, ATS does not have it + * *Meaning*: executed through Hive View, but Hadoop ExecuteJob was not created + * it can happen if user executes query without aggregation, like just "select * from TABLE" + * - ExecuteJob info only from Hive View + */ +public class Aggregator { + protected final static Logger LOG = + LoggerFactory.getLogger(Aggregator.class); + + private final IATSParser ats; + private IResourceManager<Job> viewJobResourceManager; + private final ActorRef operationController; + + public Aggregator(IResourceManager<Job> jobResourceManager, + IATSParser ats, ActorRef operationController) { + this.viewJobResourceManager = jobResourceManager; + this.ats = ats; + this.operationController = operationController; + } + + /** + * gets all the jobs for 'username' where the job submission time is between 'startTime' (inclusive) + * and endTime (exclusive). + * Fetches the jobs from ATS and DB merges and update DB. returns the combined list. + * + * @param username: username for which jobs have to be fetched. + * @param startTime: inclusive, time in secs from epoch + * @param endTime: exclusive, time in secs from epoch + * @return: list of jobs + */ + public List<Job> readAllForUserByTime(String username, long startTime, long endTime) { + List<HiveQueryId> queryIdList = ats.getHiveQueryIdsForUserByTime(username, startTime, endTime); + List<Job> allJobs = fetchDagsAndMergeJobs(queryIdList); + List<Job> dbOnlyJobs = readDBOnlyJobs(username, queryIdList, startTime, endTime); + allJobs.addAll(dbOnlyJobs); + + return allJobs; + } + + /** + * fetches the new state of jobs from ATS and from DB. Does merging/updating as required. + * @param jobInfos: infos of job to get + * @return: list of updated Job + */ + public List<Job> readJobsByIds(List<JobInfo> jobInfos) { + //categorize jobs + List<String> jobsWithHiveIds = new LinkedList<>(); + List<String> dbOnlyJobs = new LinkedList<>(); + + for (JobInfo jobInfo : jobInfos) { + if (null == jobInfo.getHiveId() || jobInfo.getHiveId().trim().isEmpty()) { + dbOnlyJobs.add(jobInfo.getJobId()); + } else { + jobsWithHiveIds.add(jobInfo.getHiveId()); + } + } + + List<HiveQueryId> queryIdList = ats.getHiveQueryIdByEntityList(jobsWithHiveIds); + List<Job> allJobs = fetchDagsAndMergeJobs(queryIdList); + List<Job> dbJobs = readJobsFromDbByJobId(dbOnlyJobs); + + allJobs.addAll(dbJobs); + return allJobs; + } + + /** + * gets the jobs from the Database given their id + * @param jobsIds: list of ids of jobs + * @return: list of all the jobs found + */ + private List<Job> readJobsFromDbByJobId(List<String> jobsIds) { + List<Job> jobs = new LinkedList<>(); + for (final String jid : jobsIds) { + try { + Job job = getJobFromDbByJobId(jid); + jobs.add(job); + } catch (ItemNotFound itemNotFound) { + LOG.error("Error while finding job with id : {}", jid, itemNotFound); + } + } + + return jobs; + } + + /** + * fetches the job from DB given its id + * @param jobId: the id of the job to fetch + * @return: the job + * @throws ItemNotFound: if job with given id is not found in db + */ + private Job getJobFromDbByJobId(final String jobId) throws ItemNotFound { + if (null == jobId) + return null; + + List<Job> jobs = viewJobResourceManager.readAll(new FilteringStrategy() { + @Override + public boolean isConform(Indexed item) { + return item.getId().equals(jobId); + } + + @Override + public String whereStatement() { + return "id = '" + jobId + "'"; // even IDs are string + } + }); + + if (null != jobs && !jobs.isEmpty()) + return jobs.get(0); + + throw new ItemNotFound(String.format("Job with id %s not found.", jobId)); + } + + /** + * returns all the jobs from ATS and DB (for this instance) for the given user. + * @param username + * @return + */ + public List<Job> readAll(String username) { + List<HiveQueryId> queries = ats.getHiveQueryIdsForUser(username); + LOG.debug("HiveQueryIds fetched : {}", queries); + List<Job> allJobs = fetchDagsAndMergeJobs(queries); + List<Job> dbOnlyJobs = readDBOnlyJobs(username, queries, null, null); + LOG.debug("Jobs only present in DB: {}", dbOnlyJobs); + allJobs.addAll(dbOnlyJobs); + return allJobs; + } + + /** + * reads all the jobs from DB for username and excludes the jobs mentioned in queries list + * @param username : username for which the jobs are to be read. + * @param queries : the jobs to exclude + * @param startTime: can be null, if not then the window start time for job + * @param endTime: can be null, if not then the window end time for job + * @return : the jobs in db that are not in the queries + */ + private List<Job> readDBOnlyJobs(String username, List<HiveQueryId> queries, Long startTime, Long endTime) { + List<Job> dbOnlyJobs = new LinkedList<>(); + HashMap<String, String> operationIdVsHiveId = new HashMap<>(); + + for (HiveQueryId hqid : queries) { + operationIdVsHiveId.put(hqid.operationId, hqid.entity); + } + LOG.debug("operationIdVsHiveId : {} ", operationIdVsHiveId); + //cover case when operationId is present, but not exists in ATS + //e.g. optimized queries without executing jobs, like "SELECT * FROM TABLE" + List<Job> jobs = viewJobResourceManager.readAll(new OnlyOwnersFilteringStrategy(username)); + for (Job job : jobs) { + if (null != startTime && null != endTime && null != job.getDateSubmitted() + && (job.getDateSubmitted() < startTime || job.getDateSubmitted() >= endTime || operationIdVsHiveId.containsKey(job.getGuid())) + ) { + continue; // don't include this in the result + } else { + dbOnlyJobs.add(job); + } + } + return dbOnlyJobs; + } + + private List<Job> fetchDagsAndMergeJobs(List<HiveQueryId> queries) { + List<Job> allJobs = new LinkedList<Job>(); + + for (HiveQueryId atsHiveQuery : queries) { + JobImpl atsJob = null; + if (hasOperationId(atsHiveQuery)) { + try { + Job viewJob = getJobByOperationId(atsHiveQuery.operationId); + TezDagId atsTezDag = getTezDagFromHiveQueryId(atsHiveQuery); + atsJob = mergeHiveAtsTez(atsHiveQuery, atsTezDag, viewJob); + } catch (ItemNotFound itemNotFound) { + LOG.error("Ignore : {}", itemNotFound.getMessage()); + continue; + } + } else { + TezDagId atsTezDag = getTezDagFromHiveQueryId(atsHiveQuery); + atsJob = atsOnlyJob(atsHiveQuery, atsTezDag); + } + + atsJob.setHiveQueryId(atsHiveQuery.entity); + allJobs.add(atsJob); + } + + return allJobs; + } + + /** + * @param atsHiveQuery + * @param atsTezDag + * @param viewJob + * @return + */ + private JobImpl mergeHiveAtsTez(HiveQueryId atsHiveQuery, TezDagId atsTezDag, Job viewJob) throws ItemNotFound { + saveJobInfoIfNeeded(atsHiveQuery, atsTezDag, viewJob); + return mergeAtsJobWithViewJob(atsHiveQuery, atsTezDag, viewJob); + } + + public Job readATSJob(Job viewJob) throws ItemNotFound { + + if (viewJob.getStatus().equals(Job.JOB_STATE_INITIALIZED) || viewJob.getStatus().equals(Job.JOB_STATE_UNKNOWN)) + return viewJob; + + String hexGuid = viewJob.getGuid(); + + + HiveQueryId atsHiveQuery = ats.getHiveQueryIdByOperationId(hexGuid); + + TezDagId atsTezDag = getTezDagFromHiveQueryId(atsHiveQuery); + + saveJobInfoIfNeeded(atsHiveQuery, atsTezDag, viewJob, true); + return mergeAtsJobWithViewJob(atsHiveQuery, atsTezDag, viewJob); + } + + private TezDagId getTezDagFromHiveQueryId(HiveQueryId atsHiveQuery) { + TezDagId atsTezDag; + if (atsHiveQuery.version >= HiveQueryId.ATS_15_RESPONSE_VERSION) { + atsTezDag = ats.getTezDAGByEntity(atsHiveQuery.entity); + } else if (atsHiveQuery.dagNames != null && atsHiveQuery.dagNames.size() > 0) { + String dagName = atsHiveQuery.dagNames.get(0); + + atsTezDag = ats.getTezDAGByName(dagName); + } else { + atsTezDag = new TezDagId(); + } + return atsTezDag; + } + + protected boolean hasOperationId(HiveQueryId atsHiveQuery) { + return atsHiveQuery.operationId != null; + } + + protected JobImpl mergeAtsJobWithViewJob(HiveQueryId atsHiveQuery, TezDagId atsTezDag, Job viewJob) { + JobImpl atsJob; + try { + atsJob = new JobImpl(PropertyUtils.describe(viewJob)); + } catch (IllegalAccessException e) { + LOG.error("Can't instantiate JobImpl", e); + return null; + } catch (InvocationTargetException e) { + LOG.error("Can't instantiate JobImpl", e); + return null; + } catch (NoSuchMethodException e) { + LOG.error("Can't instantiate JobImpl", e); + return null; + } + fillAtsJobFields(atsJob, atsHiveQuery, atsTezDag); + return atsJob; + } + + protected void saveJobInfoIfNeeded(HiveQueryId hiveQueryId, TezDagId tezDagId, Job viewJob) throws ItemNotFound { + saveJobInfoIfNeeded(hiveQueryId, tezDagId, viewJob, false); + } + + protected void saveJobInfoIfNeeded(HiveQueryId hiveQueryId, TezDagId tezDagId, Job viewJob, boolean useActorSystem) throws ItemNotFound { + boolean updateDb = false; + String dagName = null; + String dagId = null; + String applicationId = null; + if (viewJob.getDagName() == null || viewJob.getDagName().isEmpty()) { + if (hiveQueryId.dagNames != null && hiveQueryId.dagNames.size() > 0) { + dagName = hiveQueryId.dagNames.get(0); + updateDb = true; + } + } + if (tezDagId.status != null && (tezDagId.status.compareToIgnoreCase(Job.JOB_STATE_UNKNOWN) != 0) && + !viewJob.getStatus().equalsIgnoreCase(tezDagId.status)) { + dagId = tezDagId.entity; + applicationId = tezDagId.applicationId; + updateDb = true; + } + + if(updateDb) { + if (useActorSystem) { + LOG.info("Saving DAG information via actor system for job id: {}", viewJob.getId()); + operationController.tell(new SaveDagInformation(viewJob.getId(), dagName, dagId, applicationId), ActorRef.noSender()); + } else { + viewJob.setDagName(dagName); + viewJob.setDagId(dagId); + viewJob.setApplicationId(applicationId); + viewJobResourceManager.update(viewJob, viewJob.getId()); + } + } + } + + protected JobImpl atsOnlyJob(HiveQueryId atsHiveQuery, TezDagId atsTezDag) { + JobImpl atsJob = new JobImpl(); + atsJob.setId(atsHiveQuery.entity); + fillAtsJobFields(atsJob, atsHiveQuery, atsTezDag); + + String query = atsHiveQuery.query; + atsJob.setTitle(query.substring(0, (query.length() > 42) ? 42 : query.length())); + + atsJob.setQueryFile(FileService.JSON_PATH_FILE + atsHiveQuery.url + "#otherinfo.QUERY!queryText"); + return atsJob; + } + + protected JobImpl fillAtsJobFields(JobImpl atsJob, HiveQueryId atsHiveQuery, TezDagId atsTezDag) { + atsJob.setApplicationId(atsTezDag.applicationId); + + if (atsHiveQuery.dagNames != null && atsHiveQuery.dagNames.size() > 0) + atsJob.setDagName(atsHiveQuery.dagNames.get(0)); + atsJob.setDagId(atsTezDag.entity); + if (atsHiveQuery.starttime != 0) + atsJob.setDateSubmitted(atsHiveQuery.starttime); + atsJob.setDuration(atsHiveQuery.duration); + return atsJob; + } + + protected Job getJobByOperationId(final String opId) throws ItemNotFound { + List<Job> jobs = viewJobResourceManager.readAll(new FilteringStrategy() { + @Override + public boolean isConform(Indexed item) { + Job opHandle = (Job) item; + return opHandle.getGuid().equals(opId); + } + + @Override + public String whereStatement() { + return "guid='" + opId + "'"; + } + }); + + if (jobs.size() != 1) + throw new ItemNotFound(); + + return jobs.get(0); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobResourceProvider.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobResourceProvider.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobResourceProvider.java new file mode 100644 index 0000000..6156933 --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobResourceProvider.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.resources.jobs; + +import org.apache.ambari.view.*; +import org.apache.ambari.view.hive20.persistence.utils.ItemNotFound; +import org.apache.ambari.view.hive20.persistence.utils.OnlyOwnersFilteringStrategy; +import org.apache.ambari.view.hive20.resources.jobs.viewJobs.*; +import org.apache.ambari.view.hive20.utils.SharedObjectsFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import java.lang.reflect.InvocationTargetException; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Resource provider for job + */ +public class JobResourceProvider implements ResourceProvider<Job> { + @Inject + ViewContext context; + + protected JobResourceManager resourceManager = null; + protected final static Logger LOG = + LoggerFactory.getLogger(JobResourceProvider.class); + + protected synchronized JobResourceManager getResourceManager() { + if (resourceManager == null) { + resourceManager = new JobResourceManager(new SharedObjectsFactory(context), context); + } + return resourceManager; + } + + @Override + public Job getResource(String resourceId, Set<String> properties) throws SystemException, NoSuchResourceException, UnsupportedPropertyException { + try { + return getResourceManager().read(resourceId); + } catch (ItemNotFound itemNotFound) { + throw new NoSuchResourceException(resourceId); + } + } + + @Override + public Set<Job> getResources(ReadRequest readRequest) throws SystemException, NoSuchResourceException, UnsupportedPropertyException { + if (context == null) { + return new HashSet<Job>(); + } + return new HashSet<Job>(getResourceManager().readAll( + new OnlyOwnersFilteringStrategy(this.context.getUsername()))); + } + + @Override + public void createResource(String s, Map<String, Object> stringObjectMap) throws SystemException, ResourceAlreadyExistsException, NoSuchResourceException, UnsupportedPropertyException { + Job item = null; + try { + item = new JobImpl(stringObjectMap); + } catch (InvocationTargetException e) { + throw new SystemException("error on creating resource", e); + } catch (IllegalAccessException e) { + throw new SystemException("error on creating resource", e); + } + getResourceManager().create(item); + JobController jobController = new SharedObjectsFactory(context).getJobControllerFactory().createControllerForJob(item); + try { + jobController.submit(); + } catch (Throwable throwable) { + throw new SystemException("error on creating resource", throwable); + } + } + + @Override + public boolean updateResource(String resourceId, Map<String, Object> stringObjectMap) throws SystemException, NoSuchResourceException, UnsupportedPropertyException { + Job item = null; + try { + item = new JobImpl(stringObjectMap); + } catch (InvocationTargetException e) { + throw new SystemException("error on updating resource", e); + } catch (IllegalAccessException e) { + throw new SystemException("error on updating resource", e); + } + try { + getResourceManager().update(item, resourceId); + } catch (ItemNotFound itemNotFound) { + throw new NoSuchResourceException(resourceId); + } + return true; + } + + @Override + public boolean deleteResource(String resourceId) throws SystemException, NoSuchResourceException, UnsupportedPropertyException { + try { + getResourceManager().delete(resourceId); + } catch (ItemNotFound itemNotFound) { + throw new NoSuchResourceException(resourceId); + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobService.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobService.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobService.java new file mode 100644 index 0000000..675ea37 --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobService.java @@ -0,0 +1,626 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.resources.jobs; + +import akka.actor.ActorRef; +import com.beust.jcommander.internal.Lists; +import com.google.common.base.Optional; +import org.apache.ambari.view.ViewResourceHandler; +import org.apache.ambari.view.hive20.BaseService; +import org.apache.ambari.view.hive20.ConnectionFactory; +import org.apache.ambari.view.hive20.ConnectionSystem; +import org.apache.ambari.view.hive20.actor.message.job.Failure; +import org.apache.ambari.view.hive20.backgroundjobs.BackgroundJobController; +import org.apache.ambari.view.hive20.client.AsyncJobRunner; +import org.apache.ambari.view.hive20.client.AsyncJobRunnerImpl; +import org.apache.ambari.view.hive20.client.ColumnDescription; +import org.apache.ambari.view.hive20.client.Cursor; +import org.apache.ambari.view.hive20.client.EmptyCursor; +import org.apache.ambari.view.hive20.client.HiveClientException; +import org.apache.ambari.view.hive20.client.NonPersistentCursor; +import org.apache.ambari.view.hive20.client.Row; +import org.apache.ambari.view.hive20.persistence.utils.ItemNotFound; +import org.apache.ambari.view.hive20.resources.jobs.atsJobs.IATSParser; +import org.apache.ambari.view.hive20.resources.jobs.viewJobs.Job; +import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobController; +import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobImpl; +import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobInfo; +import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobResourceManager; +import org.apache.ambari.view.hive20.utils.MisconfigurationFormattedException; +import org.apache.ambari.view.hive20.utils.NotFoundFormattedException; +import org.apache.ambari.view.hive20.utils.ServiceFormattedException; +import org.apache.ambari.view.hive20.utils.SharedObjectsFactory; +import org.apache.commons.beanutils.PropertyUtils; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVPrinter; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; +import javax.ws.rs.core.UriInfo; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.lang.reflect.InvocationTargetException; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; + +/** + * Servlet for queries + * API: + * GET /:id + * read job + * POST / + * create new job + * Required: title, queryFile + * GET / + * get all Jobs of current user + */ +public class JobService extends BaseService { + @Inject + ViewResourceHandler handler; + + private JobResourceManager resourceManager; + + protected final static Logger LOG = + LoggerFactory.getLogger(JobService.class); + private Aggregator aggregator; + + protected synchronized JobResourceManager getResourceManager() { + if (resourceManager == null) { + SharedObjectsFactory connectionsFactory = getSharedObjectsFactory(); + resourceManager = new JobResourceManager(connectionsFactory, context); + } + return resourceManager; + } + + + protected Aggregator getAggregator() { + if (aggregator == null) { + IATSParser atsParser = getSharedObjectsFactory().getATSParser(); + ActorRef operationController = ConnectionSystem.getInstance().getOperationController(context); + aggregator = new Aggregator(getResourceManager(), atsParser, operationController); + } + return aggregator; + } + + protected void setAggregator(Aggregator aggregator) { + this.aggregator = aggregator; + } + + /** + * Get single item + */ + @GET + @Path("{jobId}") + @Produces(MediaType.APPLICATION_JSON) + public Response getOne(@PathParam("jobId") String jobId) { + try { + JobController jobController = getResourceManager().readController(jobId); + + Job job = jobController.getJob(); + if(job.getStatus().equals(Job.JOB_STATE_ERROR) || job.getStatus().equals(Job.JOB_STATE_CANCELED)){ + ConnectionSystem system = ConnectionSystem.getInstance(); + final AsyncJobRunner asyncJobRunner = new AsyncJobRunnerImpl(context, system.getOperationController(context), system.getActorSystem()); + Optional<Failure> error = asyncJobRunner.getError(jobId, context.getUsername()); + + if(error.isPresent()){ + Throwable th = error.get().getError(); + if(th instanceof SQLException){ + SQLException sqlException = (SQLException) th; + if(sqlException.getSQLState().equals("AUTHFAIL") && ConnectionFactory.isLdapEnabled(context)) + return Response.status(401).build(); + } + throw new Exception(th); + } + } + + JSONObject jsonJob = jsonObjectFromJob(jobController); + return Response.ok(jsonJob).build(); + } catch (WebApplicationException ex) { + throw ex; + } catch (ItemNotFound itemNotFound) { + throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound); + } catch (Exception ex) { + throw new ServiceFormattedException(ex.getMessage(), ex); + } + } + + private JSONObject jsonObjectFromJob(JobController jobController) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException { + Job hiveJob = jobController.getJobPOJO(); + + Job mergedJob; + try { + mergedJob = getAggregator().readATSJob(hiveJob); + } catch (ItemNotFound itemNotFound) { + throw new ServiceFormattedException("E010 ExecuteJob not found", itemNotFound); + } + Map createdJobMap = PropertyUtils.describe(mergedJob); + createdJobMap.remove("class"); // no need to show Bean class on client + + JSONObject jobJson = new JSONObject(); + jobJson.put("job", createdJobMap); + return jobJson; + } + + /** + * Get job results in csv format + */ + @GET + @Path("{jobId}/results/csv") + @Produces("text/csv") + public Response getResultsCSV(@PathParam("jobId") String jobId, + @Context HttpServletResponse response, + @QueryParam("fileName") String fileName, + @QueryParam("columns") final String requestedColumns) { + try { + + final String username = context.getUsername(); + + ConnectionSystem system = ConnectionSystem.getInstance(); + final AsyncJobRunner asyncJobRunner = new AsyncJobRunnerImpl(context, system.getOperationController(context), system.getActorSystem()); + + Optional<NonPersistentCursor> cursorOptional = asyncJobRunner.resetAndGetCursor(jobId, username); + + if(!cursorOptional.isPresent()){ + throw new Exception("Download failed"); + } + + final NonPersistentCursor resultSet = cursorOptional.get(); + + + StreamingOutput stream = new StreamingOutput() { + @Override + public void write(OutputStream os) throws IOException, WebApplicationException { + Writer writer = new BufferedWriter(new OutputStreamWriter(os)); + CSVPrinter csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT); + try { + + List<ColumnDescription> descriptions = resultSet.getDescriptions(); + List<String> headers = Lists.newArrayList(); + for (ColumnDescription description : descriptions) { + headers.add(description.getName()); + } + + csvPrinter.printRecord(headers.toArray()); + + while (resultSet.hasNext()) { + csvPrinter.printRecord(resultSet.next().getRow()); + writer.flush(); + } + } finally { + writer.close(); + } + } + }; + + if (fileName == null || fileName.isEmpty()) { + fileName = "results.csv"; + } + + return Response.ok(stream). + header("Content-Disposition", String.format("attachment; filename=\"%s\"", fileName)). + build(); + + + } catch (WebApplicationException ex) { + throw ex; + } catch (Throwable ex) { + throw new ServiceFormattedException(ex.getMessage(), ex); + } + } + + /** + * Get job results in csv format + */ + @GET + @Path("{jobId}/results/csv/saveToHDFS") + @Produces(MediaType.APPLICATION_JSON) + public Response getResultsToHDFS(@PathParam("jobId") String jobId, + @QueryParam("commence") String commence, + @QueryParam("file") final String targetFile, + @QueryParam("stop") final String stop, + @QueryParam("columns") final String requestedColumns, + @Context HttpServletResponse response) { + try { + + final JobController jobController = getResourceManager().readController(jobId); + final String username = context.getUsername(); + + String backgroundJobId = "csv" + String.valueOf(jobController.getJob().getId()); + if (commence != null && commence.equals("true")) { + if (targetFile == null) + throw new MisconfigurationFormattedException("targetFile should not be empty"); + + ConnectionSystem system = ConnectionSystem.getInstance(); + final AsyncJobRunner asyncJobRunner = new AsyncJobRunnerImpl(context, system.getOperationController(context), system.getActorSystem()); + + Optional<NonPersistentCursor> cursorOptional = asyncJobRunner.resetAndGetCursor(jobId, username); + + if(!cursorOptional.isPresent()){ + throw new Exception("Download failed"); + } + + final NonPersistentCursor resultSet = cursorOptional.get(); + + BackgroundJobController.getInstance(context).startJob(String.valueOf(backgroundJobId), new Runnable() { + @Override + public void run() { + + try { + + FSDataOutputStream stream = getSharedObjectsFactory().getHdfsApi().create(targetFile, true); + Writer writer = new BufferedWriter(new OutputStreamWriter(stream)); + CSVPrinter csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT); + try { + while (resultSet.hasNext() && !Thread.currentThread().isInterrupted()) { + csvPrinter.printRecord(resultSet.next().getRow()); + writer.flush(); + } + } finally { + writer.close(); + } + stream.close(); + + } catch (IOException e) { + throw new ServiceFormattedException("F010 Could not write CSV to HDFS for job#" + jobController.getJob().getId(), e); + } catch (InterruptedException e) { + throw new ServiceFormattedException("F010 Could not write CSV to HDFS for job#" + jobController.getJob().getId(), e); + } + } + }); + } + + if (stop != null && stop.equals("true")) { + BackgroundJobController.getInstance(context).interrupt(backgroundJobId); + } + + JSONObject object = new JSONObject(); + object.put("stopped", BackgroundJobController.getInstance(context).isInterrupted(backgroundJobId)); + object.put("jobId", jobController.getJob().getId()); + object.put("backgroundJobId", backgroundJobId); + object.put("operationType", "CSV2HDFS"); + object.put("status", BackgroundJobController.getInstance(context).state(backgroundJobId).toString()); + + return Response.ok(object).build(); + } catch (WebApplicationException ex) { + throw ex; + } catch (ItemNotFound itemNotFound) { + throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound); + } catch (Exception ex) { + throw new ServiceFormattedException(ex.getMessage(), ex); + } + } + + + @Path("{jobId}/status") + @GET + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response fetchJobStatus(@PathParam("jobId") String jobId) throws ItemNotFound, HiveClientException, NoOperationStatusSetException { + JobController jobController = getResourceManager().readController(jobId); + Job job = jobController.getJob(); + String jobStatus = job.getStatus(); + + + LOG.info("jobStatus : {} for jobId : {}",jobStatus, jobId); + + JSONObject jsonObject = new JSONObject(); + jsonObject.put("jobStatus", jobStatus); + jsonObject.put("jobId", jobId); + + return Response.ok(jsonObject).build(); + } + + /** + * Get next results page + */ + @GET + @Path("{jobId}/results") + @Produces(MediaType.APPLICATION_JSON) + public Response getResults(@PathParam("jobId") final String jobId, + @QueryParam("first") final String fromBeginning, + @QueryParam("count") Integer count, + @QueryParam("searchId") String searchId, + @QueryParam("format") String format, + @QueryParam("columns") final String requestedColumns) { + try { + + final String username = context.getUsername(); + + ConnectionSystem system = ConnectionSystem.getInstance(); + final AsyncJobRunner asyncJobRunner = new AsyncJobRunnerImpl(context, system.getOperationController(context), system.getActorSystem()); + + return ResultsPaginationController.getInstance(context) + .request(jobId, searchId, true, fromBeginning, count, format,requestedColumns, + new Callable<Cursor< Row, ColumnDescription >>() { + @Override + public Cursor call() throws Exception { + Optional<NonPersistentCursor> cursor; + if(fromBeginning != null && fromBeginning.equals("true")){ + cursor = asyncJobRunner.resetAndGetCursor(jobId, username); + } + else { + cursor = asyncJobRunner.getCursor(jobId, username); + } + if(cursor.isPresent()) + return cursor.get(); + else + return new EmptyCursor(); + } + }).build(); + + } catch (WebApplicationException ex) { + throw ex; + } catch (Exception ex) { + throw new ServiceFormattedException(ex.getMessage(), ex); + } + } + + /** + * Renew expiration time for results + */ + @GET + @Path("{jobId}/results/keepAlive") + public Response keepAliveResults(@PathParam("jobId") String jobId, + @QueryParam("first") String fromBeginning, + @QueryParam("count") Integer count) { + try { + if (!ResultsPaginationController.getInstance(context).keepAlive(jobId, ResultsPaginationController.DEFAULT_SEARCH_ID)) { + throw new NotFoundFormattedException("Results already expired", null); + } + return Response.ok().build(); + } catch (WebApplicationException ex) { + throw ex; + } catch (Exception ex) { + throw new ServiceFormattedException(ex.getMessage(), ex); + } + } + + /** + * Get progress info + */ + @GET + @Path("{jobId}/progress") + @Produces(MediaType.APPLICATION_JSON) + public Response getProgress(@PathParam("jobId") String jobId) { + try { + final JobController jobController = getResourceManager().readController(jobId); + + ProgressRetriever.Progress progress = new ProgressRetriever(jobController.getJob(), getSharedObjectsFactory()). + getProgress(); + + return Response.ok(progress).build(); + } catch (WebApplicationException ex) { + throw ex; + } catch (ItemNotFound itemNotFound) { + throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound); + } catch (Exception ex) { + throw new ServiceFormattedException(ex.getMessage(), ex); + } + } + + /** + * Delete single item + */ + @DELETE + @Path("{id}") + public Response delete(@PathParam("id") String id, + @QueryParam("remove") final String remove) { + try { + JobController jobController; + try { + jobController = getResourceManager().readController(id); + } catch (ItemNotFound itemNotFound) { + throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound); + } + jobController.cancel(); + if (remove != null && remove.compareTo("true") == 0) { + getResourceManager().delete(id); + } + return Response.status(204).build(); + } catch (WebApplicationException ex) { + throw ex; + } catch (ItemNotFound itemNotFound) { + throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound); + } catch (Exception ex) { + throw new ServiceFormattedException(ex.getMessage(), ex); + } + } + + /** + * Get all Jobs + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + public Response getList(@QueryParam("startTime") long startTime, @QueryParam("endTime") long endTime) { + try { + + LOG.debug("Getting all job: startTime: {}, endTime: {}",startTime,endTime); + List<Job> allJobs = getAggregator().readAllForUserByTime(context.getUsername(),startTime, endTime); + for(Job job : allJobs) { + job.setSessionTag(null); + } + JSONObject result = new JSONObject(); + result.put("jobs", allJobs); + return Response.ok(result).build(); + } catch (WebApplicationException ex) { + LOG.error("Exception occured while fetching all jobs.", ex); + throw ex; + } catch (Exception ex) { + LOG.error("Exception occured while fetching all jobs.", ex); + throw new ServiceFormattedException(ex.getMessage(), ex); + } + } + + /** + * fetch the jobs with given info. + * provide as much info about the job so that next api can optimize the fetch process. + * @param jobInfos + * @return + */ + @Path("/getList") + @POST + @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.APPLICATION_JSON) + public List<Job> getList(List<JobInfo> jobInfos) { + try { + LOG.debug("fetching jobs with ids :{}", jobInfos); + List<Job> allJobs = getAggregator().readJobsByIds(jobInfos); + for(Job job : allJobs) { + job.setSessionTag(null); + } + + return allJobs; + } catch (WebApplicationException ex) { + LOG.error("Exception occured while fetching all jobs.", ex); + throw ex; + } catch (Exception ex) { + LOG.error("Exception occured while fetching all jobs.", ex); + throw new ServiceFormattedException(ex.getMessage(), ex); + } + } + + /** + * Create job + */ + @POST + @Consumes(MediaType.APPLICATION_JSON) + public Response create(JobRequest request, @Context HttpServletResponse response, + @Context UriInfo ui) { + try { + Map jobInfo = PropertyUtils.describe(request.job); + Job job = new JobImpl(jobInfo); + JobController createdJobController = new JobServiceInternal().createJob(job, getResourceManager()); + JSONObject jobObject = jsonObjectFromJob(createdJobController); + response.setHeader("Location", + String.format("%s/%s", ui.getAbsolutePath().toString(), job.getId())); + return Response.ok(jobObject).status(201).build(); + } catch (WebApplicationException ex) { + LOG.error("Error occurred while creating job : ",ex); + throw ex; + } catch (ItemNotFound itemNotFound) { + LOG.error("Error occurred while creating job : ",itemNotFound); + throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound); + } catch (Throwable ex) { + LOG.error("Error occurred while creating job : ",ex); + throw new ServiceFormattedException(ex.getMessage(), ex); + } + } + + /** + * Remove connection credentials + */ + @DELETE + @Path("auth") + public Response removePassword() { + try { + //new UserLocalHiveAuthCredentials().remove(context); + //connectionLocal.remove(context); // force reconnect on next get + return Response.ok().status(200).build(); + } catch (WebApplicationException ex) { + throw ex; + } catch (Exception ex) { + throw new ServiceFormattedException(ex.getMessage(), ex); + } + } + + + /** + * Invalidate session + */ + @DELETE + @Path("sessions/{sessionTag}") + public Response invalidateSession(@PathParam("sessionTag") String sessionTag) { + try { + //Connection connection = connectionLocal.get(context); + //connection.invalidateSessionByTag(sessionTag); + return Response.ok().build(); + } catch (WebApplicationException ex) { + throw ex; + } catch (Exception ex) { + throw new ServiceFormattedException(ex.getMessage(), ex); + } + } + + /** + * Session status + */ + @GET + @Path("sessions/{sessionTag}") + @Produces(MediaType.APPLICATION_JSON) + public Response sessionStatus(@PathParam("sessionTag") String sessionTag) { + try { + //Connection connection = connectionLocal.get(context); + + JSONObject session = new JSONObject(); + session.put("sessionTag", sessionTag); + try { + //connection.getSessionByTag(sessionTag); + session.put("actual", true); + } catch (Exception /*HiveClientException*/ ex) { + session.put("actual", false); + } + + //TODO: New implementation + + JSONObject status = new JSONObject(); + status.put("session", session); + return Response.ok(status).build(); + } catch (WebApplicationException ex) { + throw ex; + } catch (Exception ex) { + throw new ServiceFormattedException(ex.getMessage(), ex); + } + } + + /** + * Wrapper object for json mapping + */ + public static class JobRequest { + public JobImpl job; + } + + /** + * Wrapper for authentication json mapping + */ + public static class AuthRequest { + public String password; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobServiceInternal.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobServiceInternal.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobServiceInternal.java new file mode 100644 index 0000000..1409ba8 --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobServiceInternal.java @@ -0,0 +1,35 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.ambari.view.hive20.resources.jobs; + +import org.apache.ambari.view.hive20.resources.jobs.viewJobs.Job; +import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobController; +import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobResourceManager; + +public class JobServiceInternal { + public JobController createJob(Job job, JobResourceManager resourceManager) throws Throwable { + resourceManager.create(job); + + JobController createdJobController = resourceManager.readController(job.getId()); + createdJobController.submit(); + resourceManager.saveIfModified(createdJobController); + return createdJobController; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ModifyNotificationDelegate.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ModifyNotificationDelegate.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ModifyNotificationDelegate.java new file mode 100644 index 0000000..073cdc7 --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ModifyNotificationDelegate.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.resources.jobs; + +public interface ModifyNotificationDelegate { + boolean onModification(Object object); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ModifyNotificationInvocationHandler.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ModifyNotificationInvocationHandler.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ModifyNotificationInvocationHandler.java new file mode 100644 index 0000000..51058f5 --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ModifyNotificationInvocationHandler.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.resources.jobs; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; + +public class ModifyNotificationInvocationHandler implements InvocationHandler { + private Object proxied; + private ModifyNotificationDelegate modifyDelegate; + + public ModifyNotificationInvocationHandler(Object proxied, ModifyNotificationDelegate delegate) { + this.proxied = proxied; + this.modifyDelegate = delegate; + } + + @Override + public Object invoke(Object o, Method method, Object[] args) throws Throwable { + if (method.getName().startsWith("set")) { + modifyDelegate.onModification(proxied); + } + return method.invoke(proxied, args); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/NoOperationStatusSetException.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/NoOperationStatusSetException.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/NoOperationStatusSetException.java new file mode 100644 index 0000000..31d97d0 --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/NoOperationStatusSetException.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.resources.jobs; + + +public class NoOperationStatusSetException extends Exception { +} http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ProgressRetriever.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ProgressRetriever.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ProgressRetriever.java new file mode 100644 index 0000000..4d8c7d7 --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ProgressRetriever.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.resources.jobs; + +import org.apache.ambari.view.hive20.resources.jobs.atsJobs.TezVertexId; +import org.apache.ambari.view.hive20.resources.jobs.rm.RMParser; +import org.apache.ambari.view.hive20.resources.jobs.viewJobs.Job; +import org.apache.ambari.view.hive20.utils.ServiceFormattedException; +import org.apache.ambari.view.hive20.utils.SharedObjectsFactory; + +import java.util.List; + +public class ProgressRetriever { + private final Progress progress; + private final Job job; + private final SharedObjectsFactory sharedObjects; + + public ProgressRetriever(Job job, SharedObjectsFactory sharedObjects) { + this.job = job; + this.sharedObjects = sharedObjects; + + this.progress = new Progress(); + } + + public Progress getProgress() { + jobCheck(); + + progress.dagProgress = sharedObjects.getRMParser().getDAGProgress( + job.getApplicationId(), job.getDagId()); + + List<TezVertexId> vertices = sharedObjects.getATSParser().getVerticesForDAGId(job.getDagId()); + progress.vertexProgresses = sharedObjects.getRMParser().getDAGVerticesProgress(job.getApplicationId(), job.getDagId(), vertices); + + return progress; + } + + public void jobCheck() { + if (job.getApplicationId() == null || job.getApplicationId().isEmpty()) { + throw new ServiceFormattedException("E070 ApplicationId is not defined yet"); + } + if (job.getDagId() == null || job.getDagId().isEmpty()) { + throw new ServiceFormattedException("E080 DagID is not defined yet"); + } + } + + public static class Progress { + public Double dagProgress; + public List<RMParser.VertexProgress> vertexProgresses; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ResultsPaginationController.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ResultsPaginationController.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ResultsPaginationController.java new file mode 100644 index 0000000..6efa2a9 --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ResultsPaginationController.java @@ -0,0 +1,286 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.resources.jobs; + + +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.ambari.view.ViewContext; +import org.apache.ambari.view.hive20.client.ColumnDescription; +import org.apache.ambari.view.hive20.client.Cursor; +import org.apache.ambari.view.hive20.client.HiveClientException; +import org.apache.ambari.view.hive20.client.Row; +import org.apache.ambari.view.hive20.utils.BadRequestFormattedException; +import org.apache.ambari.view.hive20.utils.ResultFetchFormattedException; +import org.apache.ambari.view.hive20.utils.ResultNotReadyFormattedException; +import org.apache.ambari.view.hive20.utils.ServiceFormattedException; +import org.apache.commons.collections4.map.PassiveExpiringMap; +import org.apache.hadoop.hbase.util.Strings; + +import javax.ws.rs.core.Response; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + +/** + * Results Pagination Controller + * Persists cursors for result sets + */ +public class ResultsPaginationController { + public static final String DEFAULT_SEARCH_ID = "default"; + private static Map<String, ResultsPaginationController> viewSingletonObjects = new HashMap<String, ResultsPaginationController>(); + public static ResultsPaginationController getInstance(ViewContext context) { + if (!viewSingletonObjects.containsKey(context.getInstanceName())) + viewSingletonObjects.put(context.getInstanceName(), new ResultsPaginationController()); + return viewSingletonObjects.get(context.getInstanceName()); + } + + public ResultsPaginationController() { + } + + private static final long EXPIRING_TIME = 10*60*1000; // 10 minutes + private static final int DEFAULT_FETCH_COUNT = 50; + private Map<String, Cursor<Row, ColumnDescription>> resultsCache; + + public static class CustomTimeToLiveExpirationPolicy extends PassiveExpiringMap.ConstantTimeToLiveExpirationPolicy<String, Cursor<Row, ColumnDescription>> { + public CustomTimeToLiveExpirationPolicy(long timeToLiveMillis) { + super(timeToLiveMillis); + } + + @Override + public long expirationTime(String key, Cursor<Row, ColumnDescription> value) { + if (key.startsWith("$")) { + return -1; //never expire + } + return super.expirationTime(key, value); + } + } + + private Map<String, Cursor<Row, ColumnDescription>> getResultsCache() { + if (resultsCache == null) { + PassiveExpiringMap<String, Cursor<Row, ColumnDescription>> resultsCacheExpiringMap = + new PassiveExpiringMap<>(new CustomTimeToLiveExpirationPolicy(EXPIRING_TIME)); + resultsCache = Collections.synchronizedMap(resultsCacheExpiringMap); + } + return resultsCache; + } + + /** + * Renew timer of cache entry. + * @param key name/id of results request + * @return false if entry not found; true if renew was ok + */ + public boolean keepAlive(String key, String searchId) { + if (searchId == null) + searchId = DEFAULT_SEARCH_ID; + String effectiveKey = key + "?" + searchId; + if (!getResultsCache().containsKey(effectiveKey)) { + return false; + } + Cursor cursor = getResultsCache().get(effectiveKey); + getResultsCache().put(effectiveKey, cursor); + cursor.keepAlive(); + return true; + } + + private Cursor<Row, ColumnDescription> getResultsSet(String key, Callable<Cursor<Row, ColumnDescription>> makeResultsSet) { + if (!getResultsCache().containsKey(key)) { + Cursor resultSet; + try { + resultSet = makeResultsSet.call(); + if (resultSet.isResettable()) { + resultSet.reset(); + } + } catch (ResultNotReadyFormattedException | ResultFetchFormattedException ex) { + throw ex; + } catch (Exception ex) { + throw new ServiceFormattedException(ex.getMessage(), ex); + } + getResultsCache().put(key, resultSet); + } + + return getResultsCache().get(key); + } + + public Response.ResponseBuilder request(String key, String searchId, boolean canExpire, String fromBeginning, Integer count, String format, String requestedColumns, Callable<Cursor<Row, ColumnDescription>> makeResultsSet) throws HiveClientException { + if (searchId == null) + searchId = DEFAULT_SEARCH_ID; + key = key + "?" + searchId; + if (!canExpire) + key = "$" + key; + if (fromBeginning != null && fromBeginning.equals("true") && getResultsCache().containsKey(key)) { + + getResultsCache().remove(key); + } + + Cursor<Row, ColumnDescription> resultSet = getResultsSet(key, makeResultsSet); + + if (count == null) + count = DEFAULT_FETCH_COUNT; + + List<ColumnDescription> allschema = resultSet.getDescriptions(); + List<Row> allRowEntries = FluentIterable.from(resultSet) + .limit(count).toList(); + + List<ColumnDescription> schema = allschema; + + final Set<Integer> selectedColumns = getRequestedColumns(requestedColumns); + if (!selectedColumns.isEmpty()) { + schema = filter(allschema, selectedColumns); + } + + List<Object[]> rows = FluentIterable.from(allRowEntries) + .transform(new Function<Row, Object[]>() { + @Override + public Object[] apply(Row input) { + if(!selectedColumns.isEmpty()) { + return filter(Lists.newArrayList(input.getRow()), selectedColumns).toArray(); + } else { + return input.getRow(); + } + } + }).toList(); + + int read = rows.size(); + if(format != null && format.equalsIgnoreCase("d3")) { + List<Map<String,Object>> results = new ArrayList<>(); + for(int i=0; i<rows.size(); i++) { + Object[] row = rows.get(i); + Map<String, Object> keyValue = new HashMap<>(row.length); + for(int j=0; j<row.length; j++) { + //Replace dots in schema with underscore + String schemaName = schema.get(j).getName(); + keyValue.put(schemaName.replace('.','_'), row[j]); + } + results.add(keyValue); + } + return Response.ok(results); + } else { + ResultsResponse resultsResponse = new ResultsResponse(); + resultsResponse.setSchema(schema); + resultsResponse.setRows(rows); + resultsResponse.setReadCount(read); + resultsResponse.setHasNext(resultSet.hasNext()); + // resultsResponse.setSize(resultSet.size()); + resultsResponse.setOffset(resultSet.getOffset()); + resultsResponse.setHasResults(true); + return Response.ok(resultsResponse); + } + } + + private <T> List<T> filter(List<T> list, Set<Integer> selectedColumns) { + List<T> filtered = Lists.newArrayList(); + for(int i: selectedColumns) { + if(list != null && list.get(i) != null) + filtered.add(list.get(i)); + } + + return filtered; + } + + private Set<Integer> getRequestedColumns(String requestedColumns) { + if(Strings.isEmpty(requestedColumns)) { + return new HashSet<>(); + } + Set<Integer> selectedColumns = Sets.newHashSet(); + for (String columnRequested : requestedColumns.split(",")) { + try { + selectedColumns.add(Integer.parseInt(columnRequested)); + } catch (NumberFormatException ex) { + throw new BadRequestFormattedException("Columns param should be comma-separated integers", ex); + } + } + return selectedColumns; + } + + private static class ResultsResponse { + private List<ColumnDescription> schema; + private List<String[]> rows; + private int readCount; + private boolean hasNext; + private long offset; + private boolean hasResults; + + public void setSchema(List<ColumnDescription> schema) { + this.schema = schema; + } + + public List<ColumnDescription> getSchema() { + return schema; + } + + public void setRows(List<Object[]> rows) { + if( null == rows ){ + this.rows = null; + } + this.rows = new ArrayList<String[]>(rows.size()); + for(Object[] row : rows ){ + String[] strs = new String[row.length]; + for( int colNum = 0 ; colNum < row.length ; colNum++ ){ + String value = String.valueOf(row[colNum]); + if(row[colNum] != null && (value.isEmpty() || value.equalsIgnoreCase("null"))){ + strs[colNum] = String.format("\"%s\"",value); + }else{ + strs[colNum] = value; + } + } + this.rows.add(strs); + } + } + + public List<String[]> getRows() { + return rows; + } + + public void setReadCount(int readCount) { + this.readCount = readCount; + } + + public void setHasNext(boolean hasNext) { + this.hasNext = hasNext; + } + + public boolean isHasNext() { + return hasNext; + } + + public long getOffset() { + return offset; + } + + public void setOffset(long offset) { + this.offset = offset; + } + + public boolean getHasResults() { + return hasResults; + } + + public void setHasResults(boolean hasResults) { + this.hasResults = hasResults; + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSParser.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSParser.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSParser.java new file mode 100644 index 0000000..6e9753d --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSParser.java @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.resources.jobs.atsJobs; + +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.List; + +/** + * Parser of ATS responses + */ +public class ATSParser implements IATSParser { + protected final static Logger LOG = + LoggerFactory.getLogger(ATSParser.class); + + private ATSRequestsDelegate delegate; + + private static final long MillisInSecond = 1000L; + + public ATSParser(ATSRequestsDelegate delegate) { + this.delegate = delegate; + } + + /** + * returns all HiveQueryIDs from ATS for the given user. + * @param username + * @return + */ + @Override + public List<HiveQueryId> getHiveQueryIdsForUser(String username) { + JSONObject entities = delegate.hiveQueryIdsForUser(username); + return parseHqidJsonFromATS(entities); + } + + /** + * parses the JSONArray or hive query IDs + * @param entities: should contain 'entities' element as JSONArray + * @return + */ + private List<HiveQueryId> parseHqidJsonFromATS(JSONObject entities) { + JSONArray jobs = (JSONArray) entities.get("entities"); + + return getHqidListFromJsonArray(jobs); + } + + /** + * parses List of HiveQueryIds from JSON + * @param jobs + * @return + */ + private List<HiveQueryId> getHqidListFromJsonArray(JSONArray jobs) { + List<HiveQueryId> parsedJobs = new LinkedList<>(); + for (Object job : jobs) { + try { + HiveQueryId parsedJob = parseAtsHiveJob((JSONObject) job); + parsedJobs.add(parsedJob); + } catch (Exception ex) { + LOG.error("Error while parsing ATS job", ex); + } + } + + return parsedJobs; + } + + @Override + public List<TezVertexId> getVerticesForDAGId(String dagId) { + JSONObject entities = delegate.tezVerticesListForDAG(dagId); + JSONArray vertices = (JSONArray) entities.get("entities"); + + List<TezVertexId> parsedVertices = new LinkedList<TezVertexId>(); + for(Object vertex : vertices) { + try { + TezVertexId parsedVertex = parseVertex((JSONObject) vertex); + parsedVertices.add(parsedVertex); + } catch (Exception ex) { + LOG.error("Error while parsing the vertex", ex); + } + } + + return parsedVertices; + } + + @Override + public HiveQueryId getHiveQueryIdByOperationId(String guidString) { + JSONObject entities = delegate.hiveQueryIdByOperationId(guidString); + return getHiveQueryIdFromJson(entities); + } + + private HiveQueryId getHiveQueryIdFromJson(JSONObject entities) { + JSONArray jobs = (JSONArray) entities.get("entities"); + + if (jobs.size() == 0) { + return new HiveQueryId(); + } + + return parseAtsHiveJob((JSONObject) jobs.get(0)); + } + + /** + * returns the hive entity from ATS. empty object if not found. + * + * @param hiveId: the entityId of the hive + * @return: empty entity if not found else HiveQueryId + */ + @Override + public HiveQueryId getHiveQueryIdByHiveEntityId(String hiveId) { + JSONObject entity = delegate.hiveQueryEntityByEntityId(hiveId); + return parseAtsHiveJob(entity); + } + + @Override + public TezDagId getTezDAGByName(String name) { + JSONArray tezDagEntities = (JSONArray) delegate.tezDagByName(name).get("entities"); + return parseTezDag(tezDagEntities); + } + + @Override + public TezDagId getTezDAGByEntity(String entity) { + JSONArray tezDagEntities = (JSONArray) delegate.tezDagByEntity(entity).get("entities"); + return parseTezDag(tezDagEntities); + } + + /** + * fetches the HIVE_QUERY_ID from ATS for given user between given time period + * + * @param username: username for which to fetch hive query IDs + * @param startTime: time in miliseconds, inclusive + * @param endTime: time in miliseconds, exclusive + * @return: List of HIVE_QUERY_ID + */ + @Override + public List<HiveQueryId> getHiveQueryIdsForUserByTime(String username, long startTime, long endTime) { + JSONObject entities = delegate.hiveQueryIdsForUserByTime(username, startTime, endTime); + return parseHqidJsonFromATS(entities); + } + + @Override + public List<HiveQueryId> getHiveQueryIdByEntityList(List<String> hiveIds) { + List<HiveQueryId> hiveQueryIds = new LinkedList<>(); + for (String id : hiveIds) { + HiveQueryId hqi = this.getHiveQueryIdByHiveEntityId(id); + if (null != hqi.entity) { + hiveQueryIds.add(hqi); + } + } + return hiveQueryIds; + } + + private TezDagId parseTezDag(JSONArray tezDagEntities) { + assert tezDagEntities.size() <= 1; + if (tezDagEntities.size() == 0) { + return new TezDagId(); + } + JSONObject tezDagEntity = (JSONObject) tezDagEntities.get(0); + + TezDagId parsedDag = new TezDagId(); + JSONArray applicationIds = (JSONArray) ((JSONObject) tezDagEntity.get("primaryfilters")).get("applicationId"); + parsedDag.entity = (String) tezDagEntity.get("entity"); + parsedDag.applicationId = (String) applicationIds.get(0); + parsedDag.status = (String) ((JSONObject) tezDagEntity.get("otherinfo")).get("status"); + return parsedDag; + } + + private HiveQueryId parseAtsHiveJob(JSONObject job) { + HiveQueryId parsedJob = new HiveQueryId(); + + parsedJob.entity = (String) job.get("entity"); + parsedJob.url = delegate.hiveQueryIdDirectUrl((String) job.get("entity")); + parsedJob.starttime = ((Long) job.get("starttime")); + + JSONObject primaryfilters = (JSONObject) job.get("primaryfilters"); + JSONArray operationIds = (JSONArray) primaryfilters.get("operationid"); + if (operationIds != null) { + parsedJob.operationId = (String) (operationIds).get(0); + } + JSONArray users = (JSONArray) primaryfilters.get("user"); + if (users != null) { + parsedJob.user = (String) (users).get(0); + } + + JSONObject lastEvent = getLastEvent(job); + long lastEventTimestamp = ((Long) lastEvent.get("timestamp")); + + parsedJob.duration = (lastEventTimestamp - parsedJob.starttime) / MillisInSecond; + + JSONObject otherinfo = (JSONObject) job.get("otherinfo"); + if (otherinfo.get("QUERY") != null) { // workaround for HIVE-10829 + JSONObject query = (JSONObject) JSONValue.parse((String) otherinfo.get("QUERY")); + + parsedJob.query = (String) query.get("queryText"); + JSONObject stages = (JSONObject) ((JSONObject) query.get("queryPlan")).get("STAGE PLANS"); + + List<String> dagIds = new LinkedList<String>(); + List<JSONObject> stagesList = new LinkedList<JSONObject>(); + + for (Object key : stages.keySet()) { + JSONObject stage = (JSONObject) stages.get(key); + if (stage.get("Tez") != null) { + String dagId = (String) ((JSONObject) stage.get("Tez")).get("DagId:"); + dagIds.add(dagId); + } + stagesList.add(stage); + } + parsedJob.dagNames = dagIds; + parsedJob.stages = stagesList; + } + + if (otherinfo.get("VERSION") != null) { + parsedJob.version = (Long) otherinfo.get("VERSION"); + } + return parsedJob; + } + + private TezVertexId parseVertex(JSONObject vertex) { + TezVertexId tezVertexId = new TezVertexId(); + tezVertexId.entity = (String)vertex.get("entity"); + JSONObject otherinfo = (JSONObject)vertex.get("otherinfo"); + if (otherinfo != null) + tezVertexId.vertexName = (String)otherinfo.get("vertexName"); + return tezVertexId; + } + + private JSONObject getLastEvent(JSONObject atsEntity) { + JSONArray events = (JSONArray) atsEntity.get("events"); + return (JSONObject) events.get(0); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSParserFactory.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSParserFactory.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSParserFactory.java new file mode 100644 index 0000000..343202e --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSParserFactory.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.resources.jobs.atsJobs; + +import org.apache.ambari.view.ViewContext; + +public class ATSParserFactory { + + private ViewContext context; + + public ATSParserFactory(ViewContext context) { + this.context = context; + } + + public ATSParser getATSParser() { + ATSRequestsDelegateImpl delegate = new ATSRequestsDelegateImpl(context, getATSUrl(context)); + return new ATSParser(delegate); + } + + public static String getATSUrl(ViewContext context) { + return context.getProperties().get("yarn.ats.url"); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSRequestsDelegate.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSRequestsDelegate.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSRequestsDelegate.java new file mode 100644 index 0000000..dac42aa --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSRequestsDelegate.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.resources.jobs.atsJobs; + +import org.json.simple.JSONObject; + +public interface ATSRequestsDelegate { + String hiveQueryIdDirectUrl(String entity); + + String hiveQueryIdOperationIdUrl(String operationId); + + String tezDagDirectUrl(String entity); + + String tezDagNameUrl(String name); + + String tezVerticesListForDAGUrl(String dagId); + + JSONObject hiveQueryIdsForUser(String username); + + JSONObject hiveQueryIdByOperationId(String operationId); + + JSONObject tezDagByName(String name); + + JSONObject tezVerticesListForDAG(String dagId); + + JSONObject tezDagByEntity(String entity); + + JSONObject hiveQueryIdsForUserByTime(String username, long startTime, long endTime); + + JSONObject hiveQueryEntityByEntityId(String hiveEntityId); +}
