http://git-wip-us.apache.org/repos/asf/ambari/blob/c0f9621f/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/files/FileService.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/files/FileService.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/files/FileService.java deleted file mode 100644 index 4e1d24f..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/files/FileService.java +++ /dev/null @@ -1,266 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.files; - -import com.jayway.jsonpath.JsonPath; -import org.apache.ambari.view.ViewContext; -import org.apache.ambari.view.ViewResourceHandler; -import org.apache.ambari.view.hive.BaseService; -import org.apache.ambari.view.hive.utils.*; -import org.apache.ambari.view.utils.hdfs.HdfsApi; -import org.apache.ambari.view.utils.hdfs.HdfsUtil; -import org.apache.commons.codec.binary.Base64; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileAlreadyExistsException; -import org.json.simple.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.inject.Inject; -import javax.servlet.http.HttpServletResponse; -import javax.ws.rs.*; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.UriInfo; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.net.URL; -import java.util.HashMap; -import org.apache.ambari.view.commons.hdfs.UserService; - -/** - * File access resource - * API: - * GET /:path - * read entire file - * POST / - * create new file - * Required: filePath - * file should not already exists - * PUT /:path - * update file content - */ -public class FileService extends BaseService { - public static final String FAKE_FILE = "fakefile://"; - public static final String JSON_PATH_FILE = "jsonpath:"; - - @Inject - ViewResourceHandler handler; - - protected final static Logger LOG = - LoggerFactory.getLogger(FileService.class); - - /** - * Get single item - */ - @GET - @Path("{filePath:.*}") - @Produces(MediaType.APPLICATION_JSON) - public Response getFilePage(@PathParam("filePath") String filePath, @QueryParam("page") Long page) throws IOException, InterruptedException { - - LOG.debug("Reading file " + filePath); - try { - FileResource file = new FileResource(); - - if (page == null) - page = 0L; - - if (filePath.startsWith(FAKE_FILE)) { - if (page > 1) - throw new IllegalArgumentException("There's only one page in fake files"); - - String encodedContent = filePath.substring(FAKE_FILE.length()); - String content = new String(Base64.decodeBase64(encodedContent)); - - fillFakeFileObject(filePath, file, content); - } else if (filePath.startsWith(JSON_PATH_FILE)) { - if (page > 1) - throw new IllegalArgumentException("There's only one page in fake files"); - - String content = getJsonPathContentByUrl(filePath); - fillFakeFileObject(filePath, file, content); - } else { - - filePath = sanitizeFilePath(filePath); - FilePaginator paginator = new FilePaginator(filePath, getSharedObjectsFactory().getHdfsApi()); - - fillRealFileObject(filePath, page, file, paginator); - } - - JSONObject object = new JSONObject(); - object.put("file", file); - return Response.ok(object).status(200).build(); - } catch (WebApplicationException ex) { - throw ex; - } catch (FileNotFoundException ex) { - throw new NotFoundFormattedException(ex.getMessage(), ex); - } catch (IllegalArgumentException ex) { - throw new BadRequestFormattedException(ex.getMessage(), ex); - } catch (Exception ex) { - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - protected String getJsonPathContentByUrl(String filePath) throws IOException { - URL url = new URL(filePath.substring(JSON_PATH_FILE.length())); - - InputStream responseInputStream = context.getURLStreamProvider().readFrom(url.toString(), "GET", - (String)null, new HashMap<String, String>()); - String response = IOUtils.toString(responseInputStream); - - for (String ref : url.getRef().split("!")) { - response = JsonPath.read(response, ref); - } - return response; - } - - public void fillRealFileObject(String filePath, Long page, FileResource file, FilePaginator paginator) throws IOException, InterruptedException { - file.setFilePath(filePath); - file.setFileContent(paginator.readPage(page)); - file.setHasNext(paginator.pageCount() > page + 1); - file.setPage(page); - file.setPageCount(paginator.pageCount()); - } - - public void fillFakeFileObject(String filePath, FileResource file, String content) { - file.setFilePath(filePath); - file.setFileContent(content); - file.setHasNext(false); - file.setPage(0); - file.setPageCount(1); - } - - /** - * Delete single item - */ - @DELETE - @Path("{filePath:.*}") - public Response deleteFile(@PathParam("filePath") String filePath) throws IOException, InterruptedException { - try { - filePath = sanitizeFilePath(filePath); - LOG.debug("Deleting file " + filePath); - if (getSharedObjectsFactory().getHdfsApi().delete(filePath, false)) { - return Response.status(204).build(); - } - throw new NotFoundFormattedException("FileSystem.delete returned false", null); - } catch (WebApplicationException ex) { - throw ex; - } catch (Exception ex) { - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - /** - * Update item - */ - @PUT - @Path("{filePath:.*}") - @Consumes(MediaType.APPLICATION_JSON) - public Response updateFile(FileResourceRequest request, - @PathParam("filePath") String filePath) throws IOException, InterruptedException { - try { - filePath = sanitizeFilePath(filePath); - LOG.debug("Rewriting file " + filePath); - FSDataOutputStream output = getSharedObjectsFactory().getHdfsApi().create(filePath, true); - output.writeBytes(request.file.getFileContent()); - output.close(); - return Response.status(204).build(); - } catch (WebApplicationException ex) { - throw ex; - } catch (Exception ex) { - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - /** - * Create script - */ - @POST - @Consumes(MediaType.APPLICATION_JSON) - public Response createFile(FileResourceRequest request, - @Context HttpServletResponse response, @Context UriInfo ui) - throws IOException, InterruptedException { - try { - LOG.debug("Creating file " + request.file.getFilePath()); - try { - FSDataOutputStream output = getSharedObjectsFactory().getHdfsApi().create(request.file.getFilePath(), false); - if (request.file.getFileContent() != null) { - output.writeBytes(request.file.getFileContent()); - } - output.close(); - } catch (FileAlreadyExistsException ex) { - throw new ServiceFormattedException("F020 File already exists", ex, 400); - } - response.setHeader("Location", - String.format("%s/%s", ui.getAbsolutePath().toString(), request.file.getFilePath())); - return Response.status(204).build(); - } catch (WebApplicationException ex) { - throw ex; - } catch (Exception ex) { - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - /** - * Checks connection to HDFS - * @param context View Context - */ - public static void hdfsSmokeTest(ViewContext context) { - try { - HdfsApi api = HdfsUtil.connectToHDFSApi(context); - api.getStatus(); - } catch (WebApplicationException ex) { - throw ex; - } catch (Exception ex) { - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - /** - * Checks connection to User HomeDirectory - * @param context View Context - */ - public static void userhomeSmokeTest(ViewContext context) { - try { - UserService userservice = new UserService(context); - userservice.homeDir(); - } catch (WebApplicationException ex) { - throw ex; - } catch (Exception ex) { - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - /** - * Wrapper object for json mapping - */ - public static class FileResourceRequest { - public FileResource file; - } - - private String sanitizeFilePath(String filePath){ - if (!filePath.startsWith("/") && !filePath.startsWith(".")) { - filePath = "/" + filePath; // some servers strip double slashes in URL - } - return filePath; - } -}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c0f9621f/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/Aggregator.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/Aggregator.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/Aggregator.java deleted file mode 100644 index 2f0138a..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/Aggregator.java +++ /dev/null @@ -1,417 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs; - -import org.apache.ambari.view.hive.persistence.utils.FilteringStrategy; -import org.apache.ambari.view.hive.persistence.utils.Indexed; -import org.apache.ambari.view.hive.persistence.utils.ItemNotFound; -import org.apache.ambari.view.hive.persistence.utils.OnlyOwnersFilteringStrategy; -import org.apache.ambari.view.hive.resources.IResourceManager; -import org.apache.ambari.view.hive.resources.files.FileService; -import org.apache.ambari.view.hive.resources.jobs.atsJobs.HiveQueryId; -import org.apache.ambari.view.hive.resources.jobs.atsJobs.IATSParser; -import org.apache.ambari.view.hive.resources.jobs.atsJobs.TezDagId; -import org.apache.ambari.view.hive.resources.jobs.viewJobs.Job; -import org.apache.ambari.view.hive.resources.jobs.viewJobs.JobImpl; -import org.apache.ambari.view.hive.resources.jobs.viewJobs.JobInfo; -import org.apache.commons.beanutils.PropertyUtils; -import org.apache.commons.codec.binary.Base64; -import org.apache.commons.codec.binary.Hex; -import org.apache.hive.service.cli.thrift.TOperationHandle; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.reflect.InvocationTargetException; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; - -/** - * View Jobs and ATS Jobs aggregator. - * There are 4 options: - * 1) ATS Job without operationId - * *Meaning*: executed outside of HS2 - * - Job info only from ATS - * 2) ATS Job with operationId - * a) Hive View Job with same operationId is not present - * *Meaning*: executed with HS2 - * - Job info only from ATS - * b) Hive View Job with operationId is present (need to merge) - * *Meaning*: executed with HS2 through Hive View - * - Job info merged from ATS and from Hive View DataStorage - * 3) Job present only in Hive View, ATS does not have it - * *Meaning*: executed through Hive View, but Hadoop Job was not created - * it can happen if user executes query without aggregation, like just "select * from TABLE" - * - Job info only from Hive View - */ -public class Aggregator { - protected final static Logger LOG = - LoggerFactory.getLogger(Aggregator.class); - - private final IATSParser ats; - private final IOperationHandleResourceManager operationHandleResourceManager; - private IResourceManager<Job> viewJobResourceManager; - - public Aggregator(IResourceManager<Job> jobResourceManager, - IOperationHandleResourceManager operationHandleResourceManager, - IATSParser ats) { - this.viewJobResourceManager = jobResourceManager; - this.operationHandleResourceManager = operationHandleResourceManager; - this.ats = ats; - } - - /** - * gets all the jobs for 'username' where the job submission time is between 'startTime' (inclusive) - * and endTime (exclusive). - * Fetches the jobs from ATS and DB merges and update DB. returns the combined list. - * - * @param username: username for which jobs have to be fetched. - * @param startTime: inclusive, time in secs from epoch - * @param endTime: exclusive, time in secs from epoch - * @return: list of jobs - */ - public List<Job> readAllForUserByTime(String username, long startTime, long endTime) { - List<HiveQueryId> queryIdList = ats.getHiveQueryIdsForUserByTime(username, startTime, endTime); - List<Job> allJobs = fetchDagsAndMergeJobs(queryIdList); - List<Job> dbOnlyJobs = readDBOnlyJobs(username, queryIdList, startTime, endTime); - allJobs.addAll(dbOnlyJobs); - - return allJobs; - } - - /** - * fetches the new state of jobs from ATS and from DB. Does merging/updating as required. - * @param jobInfos: infos of job to get - * @return: list of updated Job - */ - public List<Job> readJobsByIds(List<JobInfo> jobInfos) { - //categorize jobs - List<String> jobsWithHiveIds = new LinkedList<>(); - List<String> dbOnlyJobs = new LinkedList<>(); - - for (JobInfo jobInfo : jobInfos) { - if (null == jobInfo.getHiveId() || jobInfo.getHiveId().trim().isEmpty()) { - dbOnlyJobs.add(jobInfo.getJobId()); - } else { - jobsWithHiveIds.add(jobInfo.getHiveId()); - } - } - - List<HiveQueryId> queryIdList = ats.getHiveQueryIdByEntityList(jobsWithHiveIds); - List<Job> allJobs = fetchDagsAndMergeJobs(queryIdList); - List<Job> dbJobs = readJobsFromDbByJobId(dbOnlyJobs); - - allJobs.addAll(dbJobs); - return allJobs; - } - - /** - * gets the jobs from the Database given their id - * @param jobsIds: list of ids of jobs - * @return: list of all the jobs found - */ - private List<Job> readJobsFromDbByJobId(List<String> jobsIds) { - List<Job> jobs = new LinkedList<>(); - for (final String jid : jobsIds) { - try { - Job job = getJobFromDbByJobId(jid); - jobs.add(job); - } catch (ItemNotFound itemNotFound) { - LOG.error("Error while finding job with id : {}", jid, itemNotFound); - } - } - - return jobs; - } - - /** - * fetches the job from DB given its id - * @param jobId: the id of the job to fetch - * @return: the job - * @throws ItemNotFound: if job with given id is not found in db - */ - private Job getJobFromDbByJobId(final String jobId) throws ItemNotFound { - if (null == jobId) - return null; - - List<Job> jobs = viewJobResourceManager.readAll(new FilteringStrategy() { - @Override - public boolean isConform(Indexed item) { - return item.getId().equals(jobId); - } - - @Override - public String whereStatement() { - return "id = '" + jobId + "'"; // even IDs are string - } - }); - - if (null != jobs && !jobs.isEmpty()) - return jobs.get(0); - - throw new ItemNotFound(String.format("Job with id %s not found.", jobId)); - } - - /** - * returns the job which is associated with the given (guid) - * @param optId: the operationId for which the job needs to be fetched. - * @return: the job - * @throws ItemNotFound: if no job was found to be associated with given operationId (guid) or if no - * StoredOperationHandle found with guid optId. - */ - private Job getJobFromDbByOperationId(final String optId) throws ItemNotFound { - StoredOperationHandle handle = getStoredOperationHandleByGuid(optId); - Job job = operationHandleResourceManager.getJobByHandle(handle); - return job; - } - - /** - * returns the StoredOperationHandle with given id - * @param optId: id of the StoredOperationHandle - * @return: StoredOperationHandle - * @throws ItemNotFound: if no StoredOperationHandle found for given guid (operationId). - */ - private StoredOperationHandle getStoredOperationHandleByGuid(final String optId) throws ItemNotFound { - LOG.debug("stored procedure with operation id : {} in DB", optId); - List<StoredOperationHandle> operationHandles = operationHandleResourceManager.readAll(new FilteringStrategy() { - @Override - public boolean isConform(Indexed item) { - StoredOperationHandle soh = (StoredOperationHandle) item; - return soh.getGuid().equals(optId); - } - - @Override - public String whereStatement() { - return " guid = '" + optId + "'"; - } - }); - - if (null != operationHandles && !operationHandles.isEmpty()) { - return operationHandles.get(0); - } - - throw new ItemNotFound(String.format("Stored operation handle with id %s not found", optId)); - } - - /** - * returns all the jobs from ATS and DB (for this instance) for the given user. - * @param username - * @return - */ - public List<Job> readAll(String username) { - List<HiveQueryId> queries = ats.getHiveQueryIdsForUser(username); - LOG.debug("HiveQueryIds fetched : {}", queries); - List<Job> allJobs = fetchDagsAndMergeJobs(queries); - List<Job> dbOnlyJobs = readDBOnlyJobs(username, queries, null, null); - LOG.debug("Jobs only present in DB: {}", dbOnlyJobs); - allJobs.addAll(dbOnlyJobs); - return allJobs; - } - - /** - * reads all the jobs from DB for username and excludes the jobs mentioned in queries list - * @param username : username for which the jobs are to be read. - * @param queries : the jobs to exclude - * @param startTime: can be null, if not then the window start time for job - * @param endTime: can be null, if not then the window end time for job - * @return : the jobs in db that are not in the queries - */ - private List<Job> readDBOnlyJobs(String username, List<HiveQueryId> queries, Long startTime, Long endTime) { - List<Job> dbOnlyJobs = new LinkedList<Job>(); - HashMap<String, String> operationIdVsHiveId = new HashMap<>(); - - for (HiveQueryId hqid : queries) { - operationIdVsHiveId.put(hqid.operationId, hqid.entity); - } - LOG.debug("operationIdVsHiveId : {} ", operationIdVsHiveId); - //cover case when operationId is present, but not exists in ATS - //e.g. optimized queries without executing jobs, like "SELECT * FROM TABLE" - List<Job> jobs = viewJobResourceManager.readAll(new OnlyOwnersFilteringStrategy(username)); - for (Job job : jobs) { - if (null != startTime && null != endTime && null != job.getDateSubmitted() - && (job.getDateSubmitted() < startTime || job.getDateSubmitted() >= endTime) - ) { - continue; // don't include this in the result - } - - List<StoredOperationHandle> operationHandles = operationHandleResourceManager.readJobRelatedHandles(job); - - if (operationHandles.size() > 0) { - StoredOperationHandle operationHandle = operationHandles.get(0); - - if (!operationIdVsHiveId.containsKey(hexStringToUrlSafeBase64(operationHandle.getGuid()))) { - //e.g. query without hadoop job: select * from table - dbOnlyJobs.add(job); - } - } - } - - return dbOnlyJobs; - } - - private List<Job> fetchDagsAndMergeJobs(List<HiveQueryId> queries) { - List<Job> allJobs = new LinkedList<Job>(); - - for (HiveQueryId atsHiveQuery : queries) { - JobImpl atsJob = null; - if (hasOperationId(atsHiveQuery)) { - try { - Job viewJob = getJobFromDbByOperationId(urlSafeBase64ToHexString(atsHiveQuery.operationId)); - TezDagId atsTezDag = getTezDagFromHiveQueryId(atsHiveQuery); - atsJob = mergeHiveAtsTez(atsHiveQuery, atsTezDag, viewJob); - } catch (ItemNotFound itemNotFound) { - LOG.error("Ignore : {}", itemNotFound.getMessage()); - continue; - } - } else { - TezDagId atsTezDag = getTezDagFromHiveQueryId(atsHiveQuery); - atsJob = atsOnlyJob(atsHiveQuery, atsTezDag); - } - - atsJob.setHiveQueryId(atsHiveQuery.entity); - allJobs.add(atsJob); - } - - return allJobs; - } - - /** - * @param atsHiveQuery - * @param atsTezDag - * @param viewJob - * @return - */ - private JobImpl mergeHiveAtsTez(HiveQueryId atsHiveQuery, TezDagId atsTezDag, Job viewJob) throws ItemNotFound { - saveJobInfoIfNeeded(atsHiveQuery, atsTezDag, viewJob); - return mergeAtsJobWithViewJob(atsHiveQuery, atsTezDag, viewJob); - } - - public Job readATSJob(Job viewJob) throws ItemNotFound { - TOperationHandle operationHandle = operationHandleResourceManager.getHandleForJob(viewJob).toTOperationHandle(); - - String hexGuid = Hex.encodeHexString(operationHandle.getOperationId().getGuid()); - HiveQueryId atsHiveQuery = ats.getHiveQueryIdByOperationId(hexStringToUrlSafeBase64(hexGuid)); - - TezDagId atsTezDag = getTezDagFromHiveQueryId(atsHiveQuery); - - saveJobInfoIfNeeded(atsHiveQuery, atsTezDag, viewJob); - return mergeAtsJobWithViewJob(atsHiveQuery, atsTezDag, viewJob); - } - - private TezDagId getTezDagFromHiveQueryId(HiveQueryId atsHiveQuery) { - TezDagId atsTezDag; - if (atsHiveQuery.version >= HiveQueryId.ATS_15_RESPONSE_VERSION) { - atsTezDag = ats.getTezDAGByEntity(atsHiveQuery.entity); - } else if (atsHiveQuery.dagNames != null && atsHiveQuery.dagNames.size() > 0) { - String dagName = atsHiveQuery.dagNames.get(0); - - atsTezDag = ats.getTezDAGByName(dagName); - } else { - atsTezDag = new TezDagId(); - } - return atsTezDag; - } - - protected boolean hasOperationId(HiveQueryId atsHiveQuery) { - return atsHiveQuery.operationId != null; - } - - protected JobImpl mergeAtsJobWithViewJob(HiveQueryId atsHiveQuery, TezDagId atsTezDag, Job viewJob) { - JobImpl atsJob; - try { - atsJob = new JobImpl(PropertyUtils.describe(viewJob)); - } catch (IllegalAccessException e) { - LOG.error("Can't instantiate JobImpl", e); - return null; - } catch (InvocationTargetException e) { - LOG.error("Can't instantiate JobImpl", e); - return null; - } catch (NoSuchMethodException e) { - LOG.error("Can't instantiate JobImpl", e); - return null; - } - fillAtsJobFields(atsJob, atsHiveQuery, atsTezDag); - return atsJob; - } - - protected void saveJobInfoIfNeeded(HiveQueryId hiveQueryId, TezDagId tezDagId, Job viewJob) throws ItemNotFound { - boolean shouldUpdate = false; - if (viewJob.getDagName() == null || viewJob.getDagName().isEmpty()) { - if (hiveQueryId.dagNames != null && hiveQueryId.dagNames.size() > 0) { - viewJob.setDagName(hiveQueryId.dagNames.get(0)); - shouldUpdate = true; - } - } - if (tezDagId.status != null && (tezDagId.status.compareToIgnoreCase(Job.JOB_STATE_UNKNOWN) != 0) && - !viewJob.getStatus().equals(tezDagId.status)) { - viewJob.setDagId(tezDagId.entity); - viewJob.setStatus(tezDagId.status); - shouldUpdate = true; - } - - if (shouldUpdate) { - viewJobResourceManager.update(viewJob, viewJob.getId()); - } - } - - protected JobImpl atsOnlyJob(HiveQueryId atsHiveQuery, TezDagId atsTezDag) { - JobImpl atsJob = new JobImpl(); - atsJob.setId(atsHiveQuery.entity); - fillAtsJobFields(atsJob, atsHiveQuery, atsTezDag); - - String query = atsHiveQuery.query; - atsJob.setTitle(query.substring(0, (query.length() > 42) ? 42 : query.length())); - - atsJob.setQueryFile(FileService.JSON_PATH_FILE + atsHiveQuery.url + "#otherinfo.QUERY!queryText"); - return atsJob; - } - - protected JobImpl fillAtsJobFields(JobImpl atsJob, HiveQueryId atsHiveQuery, TezDagId atsTezDag) { - atsJob.setApplicationId(atsTezDag.applicationId); - - if (atsHiveQuery.dagNames != null && atsHiveQuery.dagNames.size() > 0) - atsJob.setDagName(atsHiveQuery.dagNames.get(0)); - atsJob.setDagId(atsTezDag.entity); - if (atsTezDag.status != null && !atsTezDag.status.equals(TezDagId.STATUS_UNKNOWN)) - atsJob.setStatus(atsTezDag.status); - if (atsHiveQuery.starttime != 0) - atsJob.setDateSubmitted(atsHiveQuery.starttime); - atsJob.setDuration(atsHiveQuery.duration); - return atsJob; - } - - protected static String urlSafeBase64ToHexString(String urlsafeBase64) { - byte[] decoded = Base64.decodeBase64(urlsafeBase64); - - StringBuilder sb = new StringBuilder(); - for (byte b : decoded) { - sb.append(String.format("%02x", b)); - } - return sb.toString(); - } - - protected static String hexStringToUrlSafeBase64(String hexString) { - byte[] decoded = new byte[hexString.length() / 2]; - - for (int i = 0; i < hexString.length(); i += 2) { - decoded[i / 2] = (byte) Integer.parseInt(String.format("%c%c", hexString.charAt(i), hexString.charAt(i + 1)), 16); - } - return Base64.encodeBase64URLSafeString(decoded); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/c0f9621f/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/ConnectionController.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/ConnectionController.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/ConnectionController.java deleted file mode 100644 index 92cf67d..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/ConnectionController.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs; - -import org.apache.ambari.view.hive.client.Connection; -import org.apache.ambari.view.hive.client.HiveClientException; -import org.apache.ambari.view.hive.utils.HiveClientFormattedException; -import org.apache.ambari.view.hive.utils.ServiceFormattedException; -import org.apache.commons.codec.binary.Hex; -import org.apache.hive.service.cli.thrift.TOperationHandle; -import org.apache.hive.service.cli.thrift.TSessionHandle; - - -public class ConnectionController { - private OperationHandleControllerFactory operationHandleControllerFactory; - private Connection connection; - - public ConnectionController(OperationHandleControllerFactory operationHandleControllerFactory, Connection connection) { - this.connection = connection; - this.operationHandleControllerFactory = operationHandleControllerFactory; - } - - public TSessionHandle getSessionByTag(String tag) throws HiveClientException { - return connection.getSessionByTag(tag); - } - - public String openSession() { - try { - TSessionHandle sessionHandle = connection.openSession(); - return getTagBySession(sessionHandle); - } catch (HiveClientException e) { - throw new HiveClientFormattedException(e); - } - } - - public static String getTagBySession(TSessionHandle sessionHandle) { - return Hex.encodeHexString(sessionHandle.getSessionId().getGuid()); - } - - public void selectDatabase(TSessionHandle session, String database) { - try { - connection.executeSync(session, "use " + database + ";"); - } catch (HiveClientException e) { - throw new HiveClientFormattedException(e); - } - } - - public OperationHandleController executeQuery(TSessionHandle session, String cmd) { - TOperationHandle operationHandle = null; - try { - operationHandle = connection.executeAsync(session, cmd); - } catch (HiveClientException e) { - throw new HiveClientFormattedException(e); - } - StoredOperationHandle storedOperationHandle = StoredOperationHandle.buildFromTOperationHandle(operationHandle); - return operationHandleControllerFactory.createControllerForHandle(storedOperationHandle); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/c0f9621f/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/FileResourceShort.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/FileResourceShort.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/FileResourceShort.java deleted file mode 100644 index 776bf64..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/FileResourceShort.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs; - -import org.apache.commons.beanutils.BeanUtils; - -import java.lang.reflect.InvocationTargetException; -import java.util.Map; - -public class FileResourceShort { - public FileResourceShort() {} - public FileResourceShort(Map<String, Object> stringObjectMap) throws InvocationTargetException, IllegalAccessException { - BeanUtils.populate(this, stringObjectMap); - } - - private Integer id; - private String path; - - public FileResourceShort(Integer id, String path) { - this.id = id; - this.path = path; - } - - public Integer getId() { - return id; - } - - public void setId(Integer id) { - this.id = id; - } - - public String getPath() { - return path; - } - - public void setPath(String path) { - this.path = path; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/c0f9621f/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/IOperationHandleResourceManager.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/IOperationHandleResourceManager.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/IOperationHandleResourceManager.java deleted file mode 100644 index 961d6c2..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/IOperationHandleResourceManager.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs; - -import org.apache.ambari.view.hive.persistence.utils.ItemNotFound; -import org.apache.ambari.view.hive.resources.IResourceManager; -import org.apache.ambari.view.hive.resources.jobs.viewJobs.Job; -import org.apache.hive.service.cli.thrift.TOperationHandle; - -import java.util.List; - -public interface IOperationHandleResourceManager extends IResourceManager<StoredOperationHandle> { - List<StoredOperationHandle> readJobRelatedHandles(Job job); - - List<Job> getHandleRelatedJobs(StoredOperationHandle operationHandle); - - Job getJobByHandle(StoredOperationHandle handle) throws ItemNotFound; - - void putHandleForJob(TOperationHandle h, Job job); - - boolean containsHandleForJob(Job job); - - StoredOperationHandle getHandleForJob(Job job) throws ItemNotFound; -} http://git-wip-us.apache.org/repos/asf/ambari/blob/c0f9621f/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobResourceProvider.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobResourceProvider.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobResourceProvider.java deleted file mode 100644 index 2aa491e..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobResourceProvider.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs; - -import org.apache.ambari.view.*; -import org.apache.ambari.view.hive.persistence.utils.ItemNotFound; -import org.apache.ambari.view.hive.persistence.utils.OnlyOwnersFilteringStrategy; -import org.apache.ambari.view.hive.resources.jobs.viewJobs.*; -import org.apache.ambari.view.hive.utils.SharedObjectsFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.inject.Inject; -import java.lang.reflect.InvocationTargetException; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -/** - * Resource provider for job - */ -public class JobResourceProvider implements ResourceProvider<Job> { - @Inject - ViewContext context; - - protected JobResourceManager resourceManager = null; - protected final static Logger LOG = - LoggerFactory.getLogger(JobResourceProvider.class); - - protected synchronized JobResourceManager getResourceManager() { - if (resourceManager == null) { - resourceManager = new JobResourceManager(new SharedObjectsFactory(context), context); - } - return resourceManager; - } - - @Override - public Job getResource(String resourceId, Set<String> properties) throws SystemException, NoSuchResourceException, UnsupportedPropertyException { - try { - return getResourceManager().read(resourceId); - } catch (ItemNotFound itemNotFound) { - throw new NoSuchResourceException(resourceId); - } - } - - @Override - public Set<Job> getResources(ReadRequest readRequest) throws SystemException, NoSuchResourceException, UnsupportedPropertyException { - if (context == null) { - return new HashSet<Job>(); - } - return new HashSet<Job>(getResourceManager().readAll( - new OnlyOwnersFilteringStrategy(this.context.getUsername()))); - } - - @Override - public void createResource(String s, Map<String, Object> stringObjectMap) throws SystemException, ResourceAlreadyExistsException, NoSuchResourceException, UnsupportedPropertyException { - Job item = null; - try { - item = new JobImpl(stringObjectMap); - } catch (InvocationTargetException e) { - throw new SystemException("error on creating resource", e); - } catch (IllegalAccessException e) { - throw new SystemException("error on creating resource", e); - } - getResourceManager().create(item); - JobController jobController = new SharedObjectsFactory(context).getJobControllerFactory().createControllerForJob(item); - jobController.submit(); - } - - @Override - public boolean updateResource(String resourceId, Map<String, Object> stringObjectMap) throws SystemException, NoSuchResourceException, UnsupportedPropertyException { - Job item = null; - try { - item = new JobImpl(stringObjectMap); - } catch (InvocationTargetException e) { - throw new SystemException("error on updating resource", e); - } catch (IllegalAccessException e) { - throw new SystemException("error on updating resource", e); - } - try { - getResourceManager().update(item, resourceId); - } catch (ItemNotFound itemNotFound) { - throw new NoSuchResourceException(resourceId); - } - return true; - } - - @Override - public boolean deleteResource(String resourceId) throws SystemException, NoSuchResourceException, UnsupportedPropertyException { - try { - getResourceManager().delete(resourceId); - } catch (ItemNotFound itemNotFound) { - throw new NoSuchResourceException(resourceId); - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/c0f9621f/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java deleted file mode 100644 index 36c2633..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java +++ /dev/null @@ -1,609 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs; - -import org.apache.ambari.view.ViewResourceHandler; -import org.apache.ambari.view.hive.BaseService; -import org.apache.ambari.view.hive.backgroundjobs.BackgroundJobController; -import org.apache.ambari.view.hive.client.Connection; -import org.apache.ambari.view.hive.client.Cursor; -import org.apache.ambari.view.hive.client.HiveAuthCredentials; -import org.apache.ambari.view.hive.client.HiveClientException; -import org.apache.ambari.view.hive.client.UserLocalConnection; -import org.apache.ambari.view.hive.client.UserLocalHiveAuthCredentials; -import org.apache.ambari.view.hive.persistence.utils.ItemNotFound; -import org.apache.ambari.view.hive.resources.jobs.atsJobs.IATSParser; -import org.apache.ambari.view.hive.resources.jobs.viewJobs.Job; -import org.apache.ambari.view.hive.resources.jobs.viewJobs.JobController; -import org.apache.ambari.view.hive.resources.jobs.viewJobs.JobImpl; -import org.apache.ambari.view.hive.resources.jobs.viewJobs.JobInfo; -import org.apache.ambari.view.hive.resources.jobs.viewJobs.JobResourceManager; -import org.apache.ambari.view.hive.utils.MisconfigurationFormattedException; -import org.apache.ambari.view.hive.utils.NotFoundFormattedException; -import org.apache.ambari.view.hive.utils.ServiceFormattedException; -import org.apache.ambari.view.hive.utils.SharedObjectsFactory; -import org.apache.commons.beanutils.PropertyUtils; -import org.apache.commons.csv.CSVFormat; -import org.apache.commons.csv.CSVPrinter; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.json.simple.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.inject.Inject; -import javax.servlet.http.HttpServletResponse; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.StreamingOutput; -import javax.ws.rs.core.UriInfo; -import java.io.BufferedWriter; -import java.io.IOException; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.io.Writer; -import java.lang.reflect.InvocationTargetException; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; - -/** - * Servlet for queries - * API: - * GET /:id - * read job - * POST / - * create new job - * Required: title, queryFile - * GET / - * get all Jobs of current user - */ -public class JobService extends BaseService { - @Inject - ViewResourceHandler handler; - - private JobResourceManager resourceManager; - private IOperationHandleResourceManager opHandleResourceManager; - private UserLocalConnection connectionLocal = new UserLocalConnection(); - - protected final static Logger LOG = - LoggerFactory.getLogger(JobService.class); - private Aggregator aggregator; - - protected synchronized JobResourceManager getResourceManager() { - if (resourceManager == null) { - SharedObjectsFactory connectionsFactory = getSharedObjectsFactory(); - resourceManager = new JobResourceManager(connectionsFactory, context); - } - return resourceManager; - } - - protected IOperationHandleResourceManager getOperationHandleResourceManager() { - if (opHandleResourceManager == null) { - opHandleResourceManager = new OperationHandleResourceManager(getSharedObjectsFactory()); - } - return opHandleResourceManager; - } - - protected Aggregator getAggregator() { - if (aggregator == null) { - IATSParser atsParser = getSharedObjectsFactory().getATSParser(); - aggregator = new Aggregator(getResourceManager(), getOperationHandleResourceManager(), atsParser); - } - return aggregator; - } - - protected void setAggregator(Aggregator aggregator) { - this.aggregator = aggregator; - } - - /** - * Get single item - */ - @GET - @Path("{jobId}") - @Produces(MediaType.APPLICATION_JSON) - public Response getOne(@PathParam("jobId") String jobId) { - try { - JobController jobController = getResourceManager().readController(jobId); - - JSONObject jsonJob = jsonObjectFromJob(jobController); - - return Response.ok(jsonJob).build(); - } catch (WebApplicationException ex) { - throw ex; - } catch (ItemNotFound itemNotFound) { - throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound); - } catch (Exception ex) { - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - private JSONObject jsonObjectFromJob(JobController jobController) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException { - Job hiveJob = jobController.getJobPOJO(); - - Job mergedJob; - try { - mergedJob = getAggregator().readATSJob(hiveJob); - } catch (ItemNotFound itemNotFound) { - throw new ServiceFormattedException("E010 Job not found", itemNotFound); - } - Map createdJobMap = PropertyUtils.describe(mergedJob); - createdJobMap.remove("class"); // no need to show Bean class on client - - JSONObject jobJson = new JSONObject(); - jobJson.put("job", createdJobMap); - return jobJson; - } - - /** - * Get job results in csv format - */ - @GET - @Path("{jobId}/results/csv") - @Produces("text/csv") - public Response getResultsCSV(@PathParam("jobId") String jobId, - @Context HttpServletResponse response, - @QueryParam("fileName") String fileName, - @QueryParam("columns") final String requestedColumns) { - try { - JobController jobController = getResourceManager().readController(jobId); - final Cursor resultSet = jobController.getResults(); - resultSet.selectColumns(requestedColumns); - - StreamingOutput stream = new StreamingOutput() { - @Override - public void write(OutputStream os) throws IOException, WebApplicationException { - Writer writer = new BufferedWriter(new OutputStreamWriter(os)); - CSVPrinter csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT); - try { - - try { - csvPrinter.printRecord(resultSet.getHeadersRow().getRow()); - } catch (HiveClientException e) { - LOG.error("Error on reading results header", e); - } - - while (resultSet.hasNext()) { - csvPrinter.printRecord(resultSet.next().getRow()); - writer.flush(); - } - } finally { - writer.close(); - } - } - }; - - if (fileName == null || fileName.isEmpty()) { - fileName = "results.csv"; - } - - return Response.ok(stream). - header("Content-Disposition", String.format("attachment; filename=\"%s\"", fileName)). - build(); - } catch (WebApplicationException ex) { - throw ex; - } catch (ItemNotFound itemNotFound) { - throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound); - } catch (Exception ex) { - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - /** - * Get job results in csv format - */ - @GET - @Path("{jobId}/results/csv/saveToHDFS") - @Produces(MediaType.APPLICATION_JSON) - public Response getResultsToHDFS(@PathParam("jobId") String jobId, - @QueryParam("commence") String commence, - @QueryParam("file") final String targetFile, - @QueryParam("stop") final String stop, - @QueryParam("columns") final String requestedColumns, - @Context HttpServletResponse response) { - try { - final JobController jobController = getResourceManager().readController(jobId); - - String backgroundJobId = "csv" + String.valueOf(jobController.getJob().getId()); - if (commence != null && commence.equals("true")) { - if (targetFile == null) - throw new MisconfigurationFormattedException("targetFile should not be empty"); - BackgroundJobController.getInstance(context).startJob(String.valueOf(backgroundJobId), new Runnable() { - @Override - public void run() { - - try { - Cursor resultSet = jobController.getResults(); - resultSet.selectColumns(requestedColumns); - - FSDataOutputStream stream = getSharedObjectsFactory().getHdfsApi().create(targetFile, true); - Writer writer = new BufferedWriter(new OutputStreamWriter(stream)); - CSVPrinter csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT); - try { - while (resultSet.hasNext() && !Thread.currentThread().isInterrupted()) { - csvPrinter.printRecord(resultSet.next().getRow()); - writer.flush(); - } - } finally { - writer.close(); - } - stream.close(); - - } catch (IOException e) { - throw new ServiceFormattedException("F010 Could not write CSV to HDFS for job#" + jobController.getJob().getId(), e); - } catch (InterruptedException e) { - throw new ServiceFormattedException("F010 Could not write CSV to HDFS for job#" + jobController.getJob().getId(), e); - } catch (ItemNotFound itemNotFound) { - throw new NotFoundFormattedException("E020 Job results are expired", itemNotFound); - } - - } - }); - } - - if (stop != null && stop.equals("true")) { - BackgroundJobController.getInstance(context).interrupt(backgroundJobId); - } - - JSONObject object = new JSONObject(); - object.put("stopped", BackgroundJobController.getInstance(context).isInterrupted(backgroundJobId)); - object.put("jobId", jobController.getJob().getId()); - object.put("backgroundJobId", backgroundJobId); - object.put("operationType", "CSV2HDFS"); - object.put("status", BackgroundJobController.getInstance(context).state(backgroundJobId).toString()); - - return Response.ok(object).build(); - } catch (WebApplicationException ex) { - throw ex; - } catch (ItemNotFound itemNotFound) { - throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound); - } catch (Exception ex) { - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - - @Path("{jobId}/status") - @GET - @Consumes(MediaType.APPLICATION_JSON) - @Produces(MediaType.APPLICATION_JSON) - public Response fetchJobStatus(@PathParam("jobId") String jobId) throws ItemNotFound, HiveClientException, NoOperationStatusSetException { - JobController jobController = getResourceManager().readController(jobId); - String jobStatus = jobController.getStatus().status; - LOG.info("jobStatus : {} for jobId : {}",jobStatus, jobId); - - JSONObject jsonObject = new JSONObject(); - jsonObject.put("jobStatus", jobStatus); - jsonObject.put("jobId", jobId); - - return Response.ok(jsonObject).build(); - } - - /** - * Get next results page - */ - @GET - @Path("{jobId}/results") - @Produces(MediaType.APPLICATION_JSON) - public Response getResults(@PathParam("jobId") String jobId, - @QueryParam("first") String fromBeginning, - @QueryParam("count") Integer count, - @QueryParam("searchId") String searchId, - @QueryParam("format") String format, - @QueryParam("columns") final String requestedColumns) { - try { - final JobController jobController = getResourceManager().readController(jobId); - LOG.info("jobController.getStatus().status : " + jobController.getStatus().status + " for job : " + jobController.getJob().getId()); - if(jobController.getStatus().status.equals(Job.JOB_STATE_INITIALIZED) - || jobController.getStatus().status.equals(Job.JOB_STATE_PENDING) - || jobController.getStatus().status.equals(Job.JOB_STATE_RUNNING) - || jobController.getStatus().status.equals(Job.JOB_STATE_UNKNOWN)){ - - return Response.status(Response.Status.SERVICE_UNAVAILABLE).header("Retry-After","1").build(); - } - if (!jobController.hasResults()) { - return ResultsPaginationController.emptyResponse().build(); - } - - return ResultsPaginationController.getInstance(context) - .request(jobId, searchId, true, fromBeginning, count, format, - new Callable<Cursor>() { - @Override - public Cursor call() throws Exception { - Cursor cursor = jobController.getResults(); - cursor.selectColumns(requestedColumns); - return cursor; - } - }).build(); - } catch (WebApplicationException ex) { - throw ex; - } catch (ItemNotFound itemNotFound) { - throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound); - } catch (Exception ex) { - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - /** - * Renew expiration time for results - */ - @GET - @Path("{jobId}/results/keepAlive") - public Response keepAliveResults(@PathParam("jobId") String jobId, - @QueryParam("first") String fromBeginning, - @QueryParam("count") Integer count) { - try { - if (!ResultsPaginationController.getInstance(context).keepAlive(jobId, ResultsPaginationController.DEFAULT_SEARCH_ID)) { - throw new NotFoundFormattedException("Results already expired", null); - } - return Response.ok().build(); - } catch (WebApplicationException ex) { - throw ex; - } catch (Exception ex) { - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - /** - * Get progress info - */ - @GET - @Path("{jobId}/progress") - @Produces(MediaType.APPLICATION_JSON) - public Response getProgress(@PathParam("jobId") String jobId) { - try { - final JobController jobController = getResourceManager().readController(jobId); - - ProgressRetriever.Progress progress = new ProgressRetriever(jobController.getJob(), getSharedObjectsFactory()). - getProgress(); - - return Response.ok(progress).build(); - } catch (WebApplicationException ex) { - throw ex; - } catch (ItemNotFound itemNotFound) { - throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound); - } catch (Exception ex) { - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - /** - * Delete single item - */ - @DELETE - @Path("{id}") - public Response delete(@PathParam("id") String id, - @QueryParam("remove") final String remove) { - try { - JobController jobController; - try { - jobController = getResourceManager().readController(id); - } catch (ItemNotFound itemNotFound) { - throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound); - } - jobController.cancel(); - if (remove != null && remove.compareTo("true") == 0) { - getResourceManager().delete(id); - } - return Response.status(204).build(); - } catch (WebApplicationException ex) { - throw ex; - } catch (ItemNotFound itemNotFound) { - throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound); - } catch (Exception ex) { - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - /** - * Get all Jobs - */ - @GET - @Produces(MediaType.APPLICATION_JSON) - public List<Job> getList(@QueryParam("startTime") long startTime, @QueryParam("endTime") long endTime) { - try { - - LOG.debug("Getting all job: startTime: {}, endTime: {}",startTime,endTime); - List<Job> allJobs = getAggregator().readAllForUserByTime(context.getUsername(),startTime, endTime); - for(Job job : allJobs) { - job.setSessionTag(null); - } - - return allJobs; - } catch (WebApplicationException ex) { - LOG.error("Exception occured while fetching all jobs.", ex); - throw ex; - } catch (Exception ex) { - LOG.error("Exception occured while fetching all jobs.", ex); - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - /** - * fetch the jobs with given info. - * provide as much info about the job so that next api can optimize the fetch process. - * @param jobInfos - * @return - */ - @Path("/getList") - @POST - @Produces(MediaType.APPLICATION_JSON) - @Consumes(MediaType.APPLICATION_JSON) - public List<Job> getList(List<JobInfo> jobInfos) { - try { - LOG.debug("fetching jobs with ids :{}", jobInfos); - List<Job> allJobs = getAggregator().readJobsByIds(jobInfos); - for(Job job : allJobs) { - job.setSessionTag(null); - } - - return allJobs; - } catch (WebApplicationException ex) { - LOG.error("Exception occured while fetching all jobs.", ex); - throw ex; - } catch (Exception ex) { - LOG.error("Exception occured while fetching all jobs.", ex); - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - /** - * Create job - */ - @POST - @Consumes(MediaType.APPLICATION_JSON) - public Response create(JobRequest request, @Context HttpServletResponse response, - @Context UriInfo ui) { - try { - Map jobInfo = PropertyUtils.describe(request.job); - Job job = new JobImpl(jobInfo); - getResourceManager().create(job); - - JobController createdJobController = getResourceManager().readController(job.getId()); - createdJobController.submit(); - getResourceManager().saveIfModified(createdJobController); - - response.setHeader("Location", - String.format("%s/%s", ui.getAbsolutePath().toString(), job.getId())); - - JSONObject jobObject = jsonObjectFromJob(createdJobController); - - return Response.ok(jobObject).status(201).build(); - } catch (WebApplicationException ex) { - LOG.error("Error occurred while creating job : ",ex); - throw ex; - } catch (ItemNotFound itemNotFound) { - LOG.error("Error occurred while creating job : ",itemNotFound); - throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound); - } catch (Exception ex) { - LOG.error("Error occurred while creating job : ",ex); - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - /** - * Set password and connect to Hive - */ - @POST - @Path("auth") - @Consumes(MediaType.APPLICATION_JSON) - public Response setupPassword(AuthRequest request) { - try { - HiveAuthCredentials authCredentials = new HiveAuthCredentials(); - authCredentials.setPassword(request.password); - new UserLocalHiveAuthCredentials().set(authCredentials, context); - - connectionLocal.remove(context); // force reconnect on next get - connectionLocal.get(context); - return Response.ok().status(200).build(); - } catch (WebApplicationException ex) { - throw ex; - } catch (Exception ex) { - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - /** - * Remove connection credentials - */ - @DELETE - @Path("auth") - public Response removePassword() { - try { - new UserLocalHiveAuthCredentials().remove(context); - connectionLocal.remove(context); // force reconnect on next get - return Response.ok().status(200).build(); - } catch (WebApplicationException ex) { - throw ex; - } catch (Exception ex) { - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - - /** - * Invalidate session - */ - @DELETE - @Path("sessions/{sessionTag}") - public Response invalidateSession(@PathParam("sessionTag") String sessionTag) { - try { - Connection connection = connectionLocal.get(context); - connection.invalidateSessionByTag(sessionTag); - return Response.ok().build(); - } catch (WebApplicationException ex) { - throw ex; - } catch (Exception ex) { - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - /** - * Session status - */ - @GET - @Path("sessions/{sessionTag}") - @Produces(MediaType.APPLICATION_JSON) - public Response sessionStatus(@PathParam("sessionTag") String sessionTag) { - try { - Connection connection = connectionLocal.get(context); - - JSONObject session = new JSONObject(); - session.put("sessionTag", sessionTag); - try { - connection.getSessionByTag(sessionTag); - session.put("actual", true); - } catch (HiveClientException ex) { - session.put("actual", false); - } - - JSONObject status = new JSONObject(); - status.put("session", session); - return Response.ok(status).build(); - } catch (WebApplicationException ex) { - throw ex; - } catch (Exception ex) { - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - /** - * Wrapper object for json mapping - */ - public static class JobRequest { - public JobImpl job; - } - - /** - * Wrapper for authentication json mapping - */ - public static class AuthRequest { - public String password; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/c0f9621f/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/LogParser.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/LogParser.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/LogParser.java deleted file mode 100644 index 54f6757..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/LogParser.java +++ /dev/null @@ -1,139 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs; - -import java.util.LinkedHashSet; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -public class LogParser { - public static final Pattern HADOOP_MR_APPS_RE = Pattern.compile("(http[^\\s]*/proxy/([a-z0-9_]+?)/)"); - public static final Pattern HADOOP_TEZ_APPS_RE = Pattern.compile("\\(Executing on YARN cluster with App id ([a-z0-9_]+?)\\)"); - private LinkedHashSet<AppId> appsList; - - private LogParser() {} - - public static LogParser parseLog(String logs) { - LogParser parser = new LogParser(); - - parser.setAppsList(parseApps(logs, parser)); - return parser; - } - - public static LinkedHashSet<AppId> parseApps(String logs, LogParser parser) { - LinkedHashSet<AppId> mrAppIds = getMRAppIds(logs); - LinkedHashSet<AppId> tezAppIds = getTezAppIds(logs); - - LinkedHashSet<AppId> appIds = new LinkedHashSet<AppId>(); - appIds.addAll(mrAppIds); - appIds.addAll(tezAppIds); - - return appIds; - } - - private static LinkedHashSet<AppId> getMRAppIds(String logs) { - Matcher m = HADOOP_MR_APPS_RE.matcher(logs); - LinkedHashSet<AppId> list = new LinkedHashSet<AppId>(); - while (m.find()) { - AppId applicationInfo = new AppId(); - applicationInfo.setTrackingUrl(m.group(1)); - applicationInfo.setIdentifier(m.group(2)); - list.add(applicationInfo); - } - return list; - } - - private static LinkedHashSet<AppId> getTezAppIds(String logs) { - Matcher m = HADOOP_TEZ_APPS_RE.matcher(logs); - LinkedHashSet<AppId> list = new LinkedHashSet<AppId>(); - while (m.find()) { - AppId applicationInfo = new AppId(); - applicationInfo.setTrackingUrl(""); - applicationInfo.setIdentifier(m.group(1)); - list.add(applicationInfo); - } - return list; - } - - public void setAppsList(LinkedHashSet<AppId> appsList) { - this.appsList = appsList; - } - - public LinkedHashSet<AppId> getAppsList() { - return appsList; - } - - public AppId getLastAppInList() { - Object[] appIds = appsList.toArray(); - if (appIds.length == 0) { - return null; - } - return (AppId) appIds[appsList.size()-1]; - } - - public static class AppId { - private String trackingUrl; - private String identifier; - - public String getTrackingUrl() { - return trackingUrl; - } - - public void setTrackingUrl(String trackingUrl) { - this.trackingUrl = trackingUrl; - } - - public String getIdentifier() { - return identifier; - } - - public void setIdentifier(String identifier) { - this.identifier = identifier; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof AppId)) return false; - - AppId appId = (AppId) o; - - if (!identifier.equals(appId.identifier)) return false; - - return true; - } - - @Override - public int hashCode() { - return identifier.hashCode(); - } - } - - public static class EmptyAppId extends AppId { - @Override - public String getTrackingUrl() { - return ""; - } - - @Override - public String getIdentifier() { - return ""; - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/c0f9621f/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/ModifyNotificationDelegate.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/ModifyNotificationDelegate.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/ModifyNotificationDelegate.java deleted file mode 100644 index 8b48c97..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/ModifyNotificationDelegate.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs; - -public interface ModifyNotificationDelegate { - boolean onModification(Object object); -} http://git-wip-us.apache.org/repos/asf/ambari/blob/c0f9621f/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/ModifyNotificationInvocationHandler.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/ModifyNotificationInvocationHandler.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/ModifyNotificationInvocationHandler.java deleted file mode 100644 index 0552a8c..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/ModifyNotificationInvocationHandler.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs; - -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.Method; - -public class ModifyNotificationInvocationHandler implements InvocationHandler { - private Object proxied; - private ModifyNotificationDelegate modifyDelegate; - - public ModifyNotificationInvocationHandler(Object proxied, ModifyNotificationDelegate delegate) { - this.proxied = proxied; - this.modifyDelegate = delegate; - } - - @Override - public Object invoke(Object o, Method method, Object[] args) throws Throwable { - if (method.getName().startsWith("set")) { - modifyDelegate.onModification(proxied); - } - return method.invoke(proxied, args); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/c0f9621f/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/NoOperationStatusSetException.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/NoOperationStatusSetException.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/NoOperationStatusSetException.java deleted file mode 100644 index 020c63e..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/NoOperationStatusSetException.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs; - - -public class NoOperationStatusSetException extends Exception { -} http://git-wip-us.apache.org/repos/asf/ambari/blob/c0f9621f/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleController.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleController.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleController.java deleted file mode 100644 index faf02b0..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleController.java +++ /dev/null @@ -1,135 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs; - - -import org.apache.ambari.view.ViewContext; -import org.apache.ambari.view.hive.client.Cursor; -import org.apache.ambari.view.hive.client.HiveClientException; -import org.apache.ambari.view.hive.client.UserLocalConnection; -import org.apache.ambari.view.hive.resources.jobs.viewJobs.Job; -import org.apache.ambari.view.hive.utils.HiveClientFormattedException; -import org.apache.hive.service.cli.thrift.TGetOperationStatusResp; -import org.apache.hive.service.cli.thrift.TOperationHandle; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class OperationHandleController { - private final static Logger LOG = - LoggerFactory.getLogger(OperationHandleController.class); - private final TOperationHandle operationHandle; - private ViewContext context; - private final StoredOperationHandle storedOperationHandle; - private final IOperationHandleResourceManager operationHandlesStorage; - - protected UserLocalConnection connectionLocal = new UserLocalConnection(); - - public OperationHandleController(ViewContext context, StoredOperationHandle storedOperationHandle, - IOperationHandleResourceManager operationHandlesStorage) { - this.context = context; - this.storedOperationHandle = storedOperationHandle; - this.operationHandle = storedOperationHandle.toTOperationHandle(); - this.operationHandlesStorage = operationHandlesStorage; - } - - public StoredOperationHandle getStoredOperationHandle() { - return storedOperationHandle; - } - - public OperationStatus getOperationStatus() throws NoOperationStatusSetException, HiveClientException { - TGetOperationStatusResp statusResp = connectionLocal.get(context).getOperationStatus(operationHandle); - - if (!statusResp.isSetOperationState()) { - throw new NoOperationStatusSetException(); - } - - OperationStatus opStatus = new OperationStatus(); - opStatus.sqlState = statusResp.getSqlState(); - opStatus.message = statusResp.getErrorMessage(); - - switch (statusResp.getOperationState()) { - case INITIALIZED_STATE: - opStatus.status = Job.JOB_STATE_INITIALIZED; - break; - case RUNNING_STATE: - opStatus.status = Job.JOB_STATE_RUNNING; - break; - case FINISHED_STATE: - opStatus.status = Job.JOB_STATE_FINISHED; - break; - case CANCELED_STATE: - opStatus.status = Job.JOB_STATE_CANCELED; - break; - case CLOSED_STATE: - opStatus.status = Job.JOB_STATE_CLOSED; - break; - case ERROR_STATE: - opStatus.status = Job.JOB_STATE_ERROR; - break; - case UKNOWN_STATE: - opStatus.status = Job.JOB_STATE_UNKNOWN; - break; - case PENDING_STATE: - opStatus.status = Job.JOB_STATE_PENDING; - break; - default: - throw new NoOperationStatusSetException(); - } - - return opStatus; - } - - public void cancel() { - try { - connectionLocal.get(context).cancelOperation(operationHandle); - } catch (HiveClientException e) { - throw new HiveClientFormattedException(e); - } - } - - public void persistHandleForJob(Job job) { - operationHandlesStorage.putHandleForJob(operationHandle, job); - } - - public String getLogs() { - String logs; - try { - logs = connectionLocal.get(context).getLogs(operationHandle); - } catch (HiveClientFormattedException ex) { - logs = ""; - LOG.info(String.format("Logs are not available yet for job #%s [%s]\n%s", - storedOperationHandle.getJobId(), storedOperationHandle.getGuid(), ex.toString())); - } - return logs; - } - - public Cursor getResults() { - return connectionLocal.get(context).getResults(operationHandle); - } - - public boolean hasResults() { - return operationHandle.isHasResultSet(); - } - - public static class OperationStatus { - public String status; - public String sqlState; - public String message; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/c0f9621f/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleControllerFactory.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleControllerFactory.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleControllerFactory.java deleted file mode 100644 index fe1b01a..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleControllerFactory.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs; - -import org.apache.ambari.view.ViewContext; -import org.apache.ambari.view.hive.persistence.utils.ItemNotFound; -import org.apache.ambari.view.hive.resources.jobs.viewJobs.Job; -import org.apache.ambari.view.hive.utils.SharedObjectsFactory; - -public class OperationHandleControllerFactory { - private IOperationHandleResourceManager operationHandlesStorage; - private ViewContext context; - - public OperationHandleControllerFactory(ViewContext context, SharedObjectsFactory storageFactory) { - this.context = context; - this.operationHandlesStorage = new OperationHandleResourceManager(storageFactory); - } - - public OperationHandleController createControllerForHandle(StoredOperationHandle storedOperationHandle) { - return new OperationHandleController(context, storedOperationHandle, operationHandlesStorage); - } - - public OperationHandleController getHandleForJob(Job job) throws ItemNotFound { - StoredOperationHandle handle = operationHandlesStorage.getHandleForJob(job); - return createControllerForHandle(handle); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/c0f9621f/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleResourceManager.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleResourceManager.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleResourceManager.java deleted file mode 100644 index c53cad5..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleResourceManager.java +++ /dev/null @@ -1,120 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs; - -import org.apache.ambari.view.hive.persistence.IStorageFactory; -import org.apache.ambari.view.hive.persistence.utils.FilteringStrategy; -import org.apache.ambari.view.hive.persistence.utils.Indexed; -import org.apache.ambari.view.hive.persistence.utils.ItemNotFound; -import org.apache.ambari.view.hive.resources.SharedCRUDResourceManager; -import org.apache.ambari.view.hive.resources.jobs.viewJobs.Job; -import org.apache.ambari.view.hive.resources.jobs.viewJobs.JobImpl; -import org.apache.ambari.view.hive.utils.ServiceFormattedException; -import org.apache.hive.service.cli.thrift.TOperationHandle; - -import java.util.LinkedList; -import java.util.List; - -public class OperationHandleResourceManager extends SharedCRUDResourceManager<StoredOperationHandle> - implements IOperationHandleResourceManager { - /** - * Constructor - */ - public OperationHandleResourceManager(IStorageFactory storageFabric) { - super(StoredOperationHandle.class, storageFabric); - } - - @Override - public List<StoredOperationHandle> readJobRelatedHandles(final Job job) { - return storageFactory.getStorage().loadAll(StoredOperationHandle.class, new FilteringStrategy() { - @Override - public boolean isConform(Indexed item) { - StoredOperationHandle handle = (StoredOperationHandle) item; - return (handle.getJobId() != null && handle.getJobId().equals(job.getId())); - } - - @Override - public String whereStatement() { - return "jobId = '" + job.getId() + "'"; - } - }); - } - - @Override - public StoredOperationHandle getHandleForJob(Job job) throws ItemNotFound { - List<StoredOperationHandle> jobRelatedHandles = readJobRelatedHandles(job); - if (jobRelatedHandles.size() == 0) - throw new ItemNotFound(); - return jobRelatedHandles.get(0); - } - - @Override - public List<Job> getHandleRelatedJobs(final StoredOperationHandle operationHandle) { - List<JobImpl> list = storageFactory.getStorage().loadAll(JobImpl.class, new FilteringStrategy() { - @Override - public boolean isConform(Indexed item) { - Job job = (Job) item; - return (job.getId() != null && job.getId().equals(operationHandle.getJobId())); - } - - @Override - public String whereStatement() { - return "id = '" + operationHandle.getJobId() + "'"; - } - }); - - if(null == list) - return null; - - List<Job> jobs = new LinkedList<Job>(list); - return jobs; - } - - @Override - public Job getJobByHandle(StoredOperationHandle handle) throws ItemNotFound { - List<Job> handleRelatedJobs = getHandleRelatedJobs(handle); - if (handleRelatedJobs.size() == 0) - throw new ItemNotFound(String.format("Job not found for operationId %s", handle)); - return handleRelatedJobs.get(0); - } - - @Override - public void putHandleForJob(TOperationHandle h, Job job) { - StoredOperationHandle handle = StoredOperationHandle.buildFromTOperationHandle(h); - handle.setJobId(job.getId()); - - List<StoredOperationHandle> jobRelatedHandles = readJobRelatedHandles(job); - if (jobRelatedHandles.size() > 0) { - handle.setId(jobRelatedHandles.get(0).getId()); // update existing - try { - update(handle, jobRelatedHandles.get(0).getId()); - } catch (ItemNotFound itemNotFound) { - throw new ServiceFormattedException("E050 Error when updating operation handle: " + itemNotFound.toString(), itemNotFound); - } - } else { - create(handle); - } - } - - @Override - public boolean containsHandleForJob(Job job) { - List<StoredOperationHandle> jobRelatedHandles = readJobRelatedHandles(job); - return jobRelatedHandles.size() > 0; - } -}
