Repository: ambari Updated Branches: refs/heads/trunk 6994fc7b0 -> 5a93dfd4a
http://git-wip-us.apache.org/repos/asf/ambari/blob/5a93dfd4/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java.orig ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java.orig b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java.orig deleted file mode 100644 index ad46e33..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java.orig +++ /dev/null @@ -1,476 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs; - -import org.apache.ambari.view.ViewResourceHandler; -import org.apache.ambari.view.hive.BaseService; -import org.apache.ambari.view.hive.backgroundjobs.BackgroundJobController; -import org.apache.ambari.view.hive.client.Connection; -import org.apache.ambari.view.hive.client.Cursor; -import org.apache.ambari.view.hive.client.HiveClientException; -import org.apache.ambari.view.hive.persistence.utils.ItemNotFound; -import org.apache.ambari.view.hive.resources.jobs.atsJobs.IATSParser; -import org.apache.ambari.view.hive.resources.jobs.viewJobs.*; -import org.apache.ambari.view.hive.utils.*; -import org.apache.commons.beanutils.PropertyUtils; -import org.apache.commons.csv.CSVFormat; -import org.apache.commons.csv.CSVPrinter; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.json.simple.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.inject.Inject; -import javax.servlet.http.HttpServletResponse; -import javax.ws.rs.*; -import javax.ws.rs.core.*; -import java.io.*; -import java.lang.reflect.InvocationTargetException; -import java.util.*; -import java.util.concurrent.Callable; - -/** - * Servlet for queries - * API: - * GET /:id - * read job - * POST / - * create new job - * Required: title, queryFile - * GET / - * get all Jobs of current user - */ -public class JobService extends BaseService { - @Inject - ViewResourceHandler handler; - - protected JobResourceManager resourceManager; - private IOperationHandleResourceManager opHandleResourceManager; - protected final static Logger LOG = - LoggerFactory.getLogger(JobService.class); - private Aggregator aggregator; - - protected synchronized JobResourceManager getResourceManager() { - if (resourceManager == null) { - SharedObjectsFactory connectionsFactory = getSharedObjectsFactory(); - resourceManager = new JobResourceManager(connectionsFactory, context); - } - return resourceManager; - } - - protected IOperationHandleResourceManager getOperationHandleResourceManager() { - if (opHandleResourceManager == null) { - opHandleResourceManager = new OperationHandleResourceManager(getSharedObjectsFactory()); - } - return opHandleResourceManager; - } - - protected Aggregator getAggregator() { - if (aggregator == null) { - IATSParser atsParser = getSharedObjectsFactory().getATSParser(); - aggregator = new Aggregator(getResourceManager(), getOperationHandleResourceManager(), atsParser); - } - return aggregator; - } - - protected void setAggregator(Aggregator aggregator) { - this.aggregator = aggregator; - } - - /** - * Get single item - */ - @GET - @Path("{jobId}") - @Produces(MediaType.APPLICATION_JSON) - public Response getOne(@PathParam("jobId") String jobId) { - try { - JobController jobController = getResourceManager().readController(jobId); - - JSONObject jsonJob = jsonObjectFromJob(jobController); - - return Response.ok(jsonJob).build(); - } catch (WebApplicationException ex) { - throw ex; - } catch (ItemNotFound itemNotFound) { - throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound); - } catch (Exception ex) { - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - private JSONObject jsonObjectFromJob(JobController jobController) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException { - Job hiveJob = jobController.getJobPOJO(); - - Job mergedJob; - try { - mergedJob = getAggregator().readATSJob(hiveJob); - } catch (ItemNotFound itemNotFound) { - throw new ServiceFormattedException("E010 Job not found", itemNotFound); - } - Map createdJobMap = PropertyUtils.describe(mergedJob); - createdJobMap.remove("class"); // no need to show Bean class on client - - JSONObject jobJson = new JSONObject(); - jobJson.put("job", createdJobMap); - return jobJson; - } - - /** - * Get job results in csv format - */ - @GET - @Path("{jobId}/results/csv") - @Produces("text/csv") - public Response getResultsCSV(@PathParam("jobId") String jobId, - @Context HttpServletResponse response, - @QueryParam("fileName") String fileName, - @QueryParam("columns") final String requestedColumns) { - try { - JobController jobController = getResourceManager().readController(jobId); - final Cursor resultSet = jobController.getResults(); - resultSet.selectColumns(requestedColumns); - - StreamingOutput stream = new StreamingOutput() { - @Override - public void write(OutputStream os) throws IOException, WebApplicationException { - Writer writer = new BufferedWriter(new OutputStreamWriter(os)); - CSVPrinter csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT); - try { - - try { - csvPrinter.printRecord(resultSet.getHeadersRow().getRow()); - } catch (HiveClientException e) { - LOG.error("Error on reading results header", e); - } - - while (resultSet.hasNext()) { - csvPrinter.printRecord(resultSet.next().getRow()); - writer.flush(); - } - } finally { - writer.close(); - } - } - }; - - if (fileName == null || fileName.isEmpty()) { - fileName = "results.csv"; - } - - return Response.ok(stream). - header("Content-Disposition", String.format("attachment; filename=\"%s\"", fileName)). - build(); - } catch (WebApplicationException ex) { - throw ex; - } catch (ItemNotFound itemNotFound) { - throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound); - } catch (Exception ex) { - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - /** - * Get job results in csv format - */ - @GET - @Path("{jobId}/results/csv/saveToHDFS") - @Produces(MediaType.APPLICATION_JSON) - public Response getResultsToHDFS(@PathParam("jobId") String jobId, - @QueryParam("commence") String commence, - @QueryParam("file") final String targetFile, - @QueryParam("stop") final String stop, - @QueryParam("columns") final String requestedColumns, - @Context HttpServletResponse response) { - try { - final JobController jobController = getResourceManager().readController(jobId); - - String backgroundJobId = "csv" + String.valueOf(jobController.getJob().getId()); - if (commence != null && commence.equals("true")) { - if (targetFile == null) - throw new MisconfigurationFormattedException("targetFile should not be empty"); - BackgroundJobController.getInstance(context).startJob(String.valueOf(backgroundJobId), new Runnable() { - @Override - public void run() { - - try { - Cursor resultSet = jobController.getResults(); - resultSet.selectColumns(requestedColumns); - - FSDataOutputStream stream = getSharedObjectsFactory().getHdfsApi().create(targetFile, true); - Writer writer = new BufferedWriter(new OutputStreamWriter(stream)); - CSVPrinter csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT); - try { - while (resultSet.hasNext() && !Thread.currentThread().isInterrupted()) { - csvPrinter.printRecord(resultSet.next().getRow()); - writer.flush(); - } - } finally { - writer.close(); - } - stream.close(); - - } catch (IOException e) { - throw new ServiceFormattedException("F010 Could not write CSV to HDFS for job#" + jobController.getJob().getId(), e); - } catch (InterruptedException e) { - throw new ServiceFormattedException("F010 Could not write CSV to HDFS for job#" + jobController.getJob().getId(), e); - } catch (ItemNotFound itemNotFound) { - throw new NotFoundFormattedException("E020 Job results are expired", itemNotFound); - } - - } - }); - } - - if (stop != null && stop.equals("true")) { - BackgroundJobController.getInstance(context).interrupt(backgroundJobId); - } - - JSONObject object = new JSONObject(); - object.put("stopped", BackgroundJobController.getInstance(context).isInterrupted(backgroundJobId)); - object.put("jobId", jobController.getJob().getId()); - object.put("backgroundJobId", backgroundJobId); - object.put("operationType", "CSV2HDFS"); - object.put("status", BackgroundJobController.getInstance(context).state(backgroundJobId).toString()); - - return Response.ok(object).build(); - } catch (WebApplicationException ex) { - throw ex; - } catch (ItemNotFound itemNotFound) { - throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound); - } catch (Exception ex) { - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - /** - * Get next results page - */ - @GET - @Path("{jobId}/results") - @Produces(MediaType.APPLICATION_JSON) - public Response getResults(@PathParam("jobId") String jobId, - @QueryParam("first") String fromBeginning, - @QueryParam("count") Integer count, - @QueryParam("searchId") String searchId, - @QueryParam("format") String format, - @QueryParam("columns") final String requestedColumns) { - try { - final JobController jobController = getResourceManager().readController(jobId); - if (!jobController.hasResults()) { - return ResultsPaginationController.emptyResponse().build(); - } - - return ResultsPaginationController.getInstance(context) - .request(jobId, searchId, true, fromBeginning, count, format, - new Callable<Cursor>() { - @Override - public Cursor call() throws Exception { - Cursor cursor = jobController.getResults(); - cursor.selectColumns(requestedColumns); - return cursor; - } - }).build(); - } catch (WebApplicationException ex) { - throw ex; - } catch (ItemNotFound itemNotFound) { - throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound); - } catch (Exception ex) { - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - /** - * Renew expiration time for results - */ - @GET - @Path("{jobId}/results/keepAlive") - public Response keepAliveResults(@PathParam("jobId") String jobId, - @QueryParam("first") String fromBeginning, - @QueryParam("count") Integer count) { - try { - if (!ResultsPaginationController.getInstance(context).keepAlive(jobId, ResultsPaginationController.DEFAULT_SEARCH_ID)) { - throw new NotFoundFormattedException("Results already expired", null); - } - return Response.ok().build(); - } catch (WebApplicationException ex) { - throw ex; - } catch (Exception ex) { - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - /** - * Get progress info - */ - @GET - @Path("{jobId}/progress") - @Produces(MediaType.APPLICATION_JSON) - public Response getProgress(@PathParam("jobId") String jobId) { - try { - final JobController jobController = getResourceManager().readController(jobId); - - ProgressRetriever.Progress progress = new ProgressRetriever(jobController.getJob(), getSharedObjectsFactory()). - getProgress(); - - return Response.ok(progress).build(); - } catch (WebApplicationException ex) { - throw ex; - } catch (ItemNotFound itemNotFound) { - throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound); - } catch (Exception ex) { - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - /** - * Delete single item - */ - @DELETE - @Path("{id}") - public Response delete(@PathParam("id") String id, - @QueryParam("remove") final String remove) { - try { - JobController jobController; - try { - jobController = getResourceManager().readController(id); - } catch (ItemNotFound itemNotFound) { - throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound); - } - jobController.cancel(); - if (remove != null && remove.compareTo("true") == 0) { - getResourceManager().delete(id); - } - return Response.status(204).build(); - } catch (WebApplicationException ex) { - throw ex; - } catch (ItemNotFound itemNotFound) { - throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound); - } catch (Exception ex) { - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - /** - * Get all Jobs - */ - @GET - @Produces(MediaType.APPLICATION_JSON) - public Response getList() { - try { - LOG.debug("Getting all job"); - List<Job> allJobs = getAggregator().readAll(context.getUsername()); - for(Job job : allJobs) { - job.setSessionTag(null); - } - - JSONObject object = new JSONObject(); - object.put("jobs", allJobs); - return Response.ok(object).build(); - } catch (WebApplicationException ex) { - throw ex; - } catch (Exception ex) { - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - /** - * Create job - */ - @POST - @Consumes(MediaType.APPLICATION_JSON) - public Response create(JobRequest request, @Context HttpServletResponse response, - @Context UriInfo ui) { - try { - Map jobInfo = PropertyUtils.describe(request.job); - Job job = new JobImpl(jobInfo); - getResourceManager().create(job); - - JobController createdJobController = getResourceManager().readController(job.getId()); - createdJobController.submit(); - getResourceManager().saveIfModified(createdJobController); - - response.setHeader("Location", - String.format("%s/%s", ui.getAbsolutePath().toString(), job.getId())); - - JSONObject jobObject = jsonObjectFromJob(createdJobController); - - return Response.ok(jobObject).status(201).build(); - } catch (WebApplicationException ex) { - throw ex; - } catch (ItemNotFound itemNotFound) { - throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound); - } catch (Exception ex) { - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - /** - * Invalidate session - */ - @DELETE - @Path("sessions/{sessionTag}") - public Response invalidateSession(@PathParam("sessionTag") String sessionTag) { - try { - Connection connection = getSharedObjectsFactory().getHiveConnection(); - connection.invalidateSessionByTag(sessionTag); - return Response.ok().build(); - } catch (WebApplicationException ex) { - throw ex; - } catch (Exception ex) { - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - /** - * Session status - */ - @GET - @Path("sessions/{sessionTag}") - @Produces(MediaType.APPLICATION_JSON) - public Response sessionStatus(@PathParam("sessionTag") String sessionTag) { - try { - Connection connection = getSharedObjectsFactory().getHiveConnection(); - - JSONObject session = new JSONObject(); - session.put("sessionTag", sessionTag); - try { - connection.getSessionByTag(sessionTag); - session.put("actual", true); - } catch (HiveClientException ex) { - session.put("actual", false); - } - - JSONObject status = new JSONObject(); - status.put("session", session); - return Response.ok(status).build(); - } catch (WebApplicationException ex) { - throw ex; - } catch (Exception ex) { - throw new ServiceFormattedException(ex.getMessage(), ex); - } - } - - /** - * Wrapper object for json mapping - */ - public static class JobRequest { - public JobImpl job; - } -}
