http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobControllerImpl.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobControllerImpl.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobControllerImpl.java deleted file mode 100644 index a9f315c..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobControllerImpl.java +++ /dev/null @@ -1,326 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs; - -import org.apache.ambari.view.ViewContext; -import org.apache.ambari.view.hive.client.*; -import org.apache.ambari.view.hive.persistence.utils.ItemNotFound; -import org.apache.ambari.view.hive.resources.savedQueries.SavedQuery; -import org.apache.ambari.view.hive.resources.savedQueries.SavedQueryResourceManager; -import org.apache.ambari.view.hive.utils.*; -import org.apache.ambari.view.hive.utils.HdfsApi; -import org.apache.ambari.view.hive.utils.HdfsUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.lang.reflect.Proxy; -import java.text.SimpleDateFormat; -import java.util.*; - -public class JobControllerImpl implements JobController, ModifyNotificationDelegate { - private final static Logger LOG = - LoggerFactory.getLogger(JobControllerImpl.class); - - private ViewContext context; - private Job jobUnproxied; - private Job job; - private boolean modified; - - private OperationHandleControllerFactory operationHandleControllerFactory; - private ConnectionController hiveSession; - private SavedQueryResourceManager savedQueryResourceManager; - - /** - * JobController constructor - * Warning: Create JobControllers ONLY using JobControllerFactory! - */ - public JobControllerImpl(ViewContext context, Job job) { - this.context = context; - setJobPOJO(job); - operationHandleControllerFactory = OperationHandleControllerFactory.getInstance(context); - hiveSession = ConnectionController.getInstance(context); - savedQueryResourceManager = SavedQueryResourceManager.getInstance(context); - } - - public String getQueryForJob() { - FilePaginator paginator = new FilePaginator(job.getQueryFile(), context); - String query; - try { - query = paginator.readPage(0); //warning - reading only 0 page restricts size of query to 1MB - } catch (IOException e) { - throw new ServiceFormattedException("Error when reading file: " + e.toString(), e); - } catch (InterruptedException e) { - throw new ServiceFormattedException("Error when reading file: " + e.toString(), e); - } - return query; - } - - private static final String DEFAULT_DB = "default"; - public String getJobDatabase() { - if (job.getDataBase() != null) { - return job.getDataBase(); - } else { - return DEFAULT_DB; - } - } - - @Override - public void submit() { - setupHiveBeforeQueryExecute(); - - String query = getQueryForJob(); - OperationHandleController handleController = hiveSession.executeQuery(query); - - handleController.persistHandleForJob(job); - } - - private void setupHiveBeforeQueryExecute() { - String database = getJobDatabase(); - hiveSession.selectDatabase(database); - } - - @Override - public void cancel() throws ItemNotFound { - OperationHandleController handle = operationHandleControllerFactory.getHandleForJob(job); - handle.cancel(); - } - - @Override - public void onRead() { - updateOperationStatus(); - updateOperationLogs(); - - updateJobDuration(); - } - - public void updateOperationStatus() { - try { - - OperationHandleController handle = operationHandleControllerFactory.getHandleForJob(job); - String status = handle.getOperationStatus(); - job.setStatus(status); - LOG.debug("Status of job#" + job.getId() + " is " + job.getStatus()); - - } catch (NoOperationStatusSetException e) { - LOG.info("Operation state is not set for job#" + job.getId()); - - } catch (HiveErrorStatusException e) { - LOG.debug("Error updating status for job#" + job.getId() + ": " + e.getMessage()); - job.setStatus(Job.JOB_STATE_UNKNOWN); - - } catch (HiveClientException e) { - throw new ServiceFormattedException("Could not fetch job status " + job.getId(), e); - - } catch (ItemNotFound itemNotFound) { - LOG.debug("No TOperationHandle for job#" + job.getId() + ", can't update status"); - } - } - - public void updateOperationLogs() { - try { - OperationHandleController handle = operationHandleControllerFactory.getHandleForJob(job); - String logs = handle.getLogs(); - -// LogParser info = LogParser.parseLog(logs); - - String logFilePath = job.getLogFile(); - HdfsUtil.putStringToFile(context, logFilePath, logs); - - } catch (HiveClientRuntimeException ex) { - LOG.error("Error while fetching logs: " + ex.getMessage()); - } catch (ItemNotFound itemNotFound) { - LOG.debug("No TOperationHandle for job#" + job.getId() + ", can't read logs"); - } - } - - public boolean isJobEnded() { - String status = job.getStatus(); - return status.equals(Job.JOB_STATE_FINISHED) || status.equals(Job.JOB_STATE_CANCELED) || - status.equals(Job.JOB_STATE_CLOSED) || status.equals(Job.JOB_STATE_ERROR) || - status.equals(Job.JOB_STATE_UNKNOWN); // Unknown is not finished, but polling makes no sense - } - - @Override - public Job getJob() { - return job; - } - - /** - * Use carefully. Returns unproxied bean object - * @return unproxied bean object - */ - @Override - public Job getJobPOJO() { - return jobUnproxied; - } - - public void setJobPOJO(Job jobPOJO) { - Job jobModifyNotificationProxy = (Job) Proxy.newProxyInstance(jobPOJO.getClass().getClassLoader(), - new Class[]{Job.class}, - new ModifyNotificationInvocationHandler(jobPOJO, this)); - this.job = jobModifyNotificationProxy; - - this.jobUnproxied = jobPOJO; - } - - @Override - public Cursor getResults() throws ItemNotFound { - OperationHandleController handle = operationHandleControllerFactory.getHandleForJob(job); - return handle.getResults(); - } - - @Override - public void afterCreation() { - setupStatusDirIfNotPresent(); - setupQueryFileIfNotPresent(); - setupLogFileIfNotPresent(); - - setCreationDate(); - } - - public void setupLogFileIfNotPresent() { - if (job.getLogFile() == null || job.getLogFile().isEmpty()) { - setupLogFile(); - } - } - - public void setupQueryFileIfNotPresent() { - if (job.getQueryFile() == null || job.getQueryFile().isEmpty()) { - setupQueryFile(); - } - } - - public void setupStatusDirIfNotPresent() { - if (job.getStatusDir() == null || job.getStatusDir().isEmpty()) { - setupStatusDir(); - } - } - - private static final long MillisInSecond = 1000L; - - public void updateJobDuration() { - job.setDuration(System.currentTimeMillis() / MillisInSecond - job.getDateSubmitted()); - } - - public void setCreationDate() { - job.setDateSubmitted(System.currentTimeMillis() / MillisInSecond); - } - - - - private void setupLogFile() { - LOG.debug("Creating log file for job#" + job.getId()); - - String logFile = job.getStatusDir() + "/" + "logs"; - HdfsUtil.putStringToFile(context, logFile, ""); - - job.setLogFile(logFile); - LOG.debug("Log file for job#" + job.getId() + ": " + logFile); - } - - private void setupStatusDir() { - String newDirPrefix = makeStatusDirectoryPrefix(); - String newDir = HdfsUtil.findUnallocatedFileName(context, newDirPrefix, ""); - - job.setStatusDir(newDir); - LOG.debug("Status dir for job#" + job.getId() + ": " + newDir); - } - - private String makeStatusDirectoryPrefix() { - String userScriptsPath = context.getProperties().get("jobs.dir"); - - if (userScriptsPath == null) { // TODO: move check to initialization code - String msg = "jobs.dir is not configured!"; - LOG.error(msg); - throw new MisconfigurationFormattedException("jobs.dir"); - } - - String normalizedName = String.format("hive-job-%d", job.getId()); - String timestamp = new SimpleDateFormat("yyyy-MM-dd_hh-mm").format(new Date()); - return String.format(userScriptsPath + - "/%s-%s", normalizedName, timestamp); - } - - private void setupQueryFile() { - String statusDir = job.getStatusDir(); - assert statusDir != null : "setupStatusDir() should be called first"; - - String jobQueryFilePath = statusDir + "/" + "query.hql"; - - try { - - if (job.getForcedContent() != null) { - - HdfsUtil.putStringToFile(context, jobQueryFilePath, job.getForcedContent()); - job.setForcedContent(""); // prevent forcedContent to be written to DB - - } - else if (job.getQueryId() != null) { - - String savedQueryFile = getRelatedSavedQueryFile(); - HdfsApi.getInstance(context).copy(savedQueryFile, jobQueryFilePath); - job.setQueryFile(jobQueryFilePath); - - } else { - - throw new BadRequestFormattedException("queryId or forcedContent should be passed!", null); - - } - - } catch (IOException e) { - throw new ServiceFormattedException("Error in creation: " + e.toString(), e); - } catch (InterruptedException e) { - throw new ServiceFormattedException("Error in creation: " + e.toString(), e); - } - job.setQueryFile(jobQueryFilePath); - - LOG.debug("Query file for job#" + job.getId() + ": " + jobQueryFilePath); - } - - private String getRelatedSavedQueryFile() { - SavedQuery savedQuery; - try { - savedQuery = savedQueryResourceManager.read(job.getQueryId()); - } catch (ItemNotFound itemNotFound) { - throw new BadRequestFormattedException("queryId not found!", itemNotFound); - } - return savedQuery.getQueryFile(); - } - - @Override - public boolean onModification(Object object) { - setModified(true); - return true; - } - - @Override - public boolean isModified() { - return modified; - } - - public void setModified(boolean modified) { - this.modified = modified; - } - - @Override - public void clearModified() { - setModified(false); - } -}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobImpl.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobImpl.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobImpl.java deleted file mode 100644 index 7d65957..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobImpl.java +++ /dev/null @@ -1,203 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs; - -import org.apache.commons.beanutils.PropertyUtils; - -import java.lang.reflect.InvocationTargetException; -import java.util.Map; - -/** - * Bean to represent saved query - */ -public class JobImpl implements Job { - private String title = null; - private String queryFile = null; - private String statusDir = null; - private Long dateSubmitted = 0L; - private Long duration = 0L; - private String status = JOB_STATE_UNKNOWN; - private String forcedContent = null; - private String dataBase = null; - private Integer queryId = null; - - private Integer id = null; - private String owner = null; - - private String logFile; - private String confFile; - - public JobImpl() {} - public JobImpl(Map<String, Object> stringObjectMap) throws InvocationTargetException, IllegalAccessException { - for (Map.Entry<String, Object> entry : stringObjectMap.entrySet()) { - try { - PropertyUtils.setProperty(this, entry.getKey(), entry.getValue()); - } catch (NoSuchMethodException e) { - //do nothing, skip - } - } - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof Job)) return false; - - JobImpl job = (JobImpl) o; - - if (id != null ? !id.equals(job.id) : job.id != null) return false; - - return true; - } - - @Override - public int hashCode() { - return id != null ? id.hashCode() : 0; - } - - @Override - public Integer getId() { - return id; - } - - @Override - public void setId(Integer id) { - this.id = id; - } - - @Override - public String getOwner() { - return owner; - } - - @Override - public void setOwner(String owner) { - this.owner = owner; - } - - @Override - public String getTitle() { - return title; - } - - @Override - public void setTitle(String title) { - this.title = title; - } - - @Override - public String getQueryFile() { - return queryFile; - } - - @Override - public void setQueryFile(String queryFile) { - this.queryFile = queryFile; - } - - @Override - public Long getDateSubmitted() { - return dateSubmitted; - } - - @Override - public void setDateSubmitted(Long dateSubmitted) { - this.dateSubmitted = dateSubmitted; - } - - @Override - public Long getDuration() { - return duration; - } - - @Override - public void setDuration(Long duration) { - this.duration = duration; - } - - @Override - public String getStatus() { - return status; - } - - @Override - public void setStatus(String status) { - this.status = status; - } - - @Override - public String getForcedContent() { - return forcedContent; - } - - @Override - public void setForcedContent(String forcedContent) { - this.forcedContent = forcedContent; - } - - @Override - public Integer getQueryId() { - return queryId; - } - - @Override - public void setQueryId(Integer queryId) { - this.queryId = queryId; - } - - @Override - public String getStatusDir() { - return statusDir; - } - - @Override - public void setStatusDir(String statusDir) { - this.statusDir = statusDir; - } - - @Override - public String getDataBase() { - return dataBase; - } - - @Override - public void setDataBase(String dataBase) { - this.dataBase = dataBase; - } - - @Override - public String getLogFile() { - return logFile; - } - - @Override - public void setLogFile(String logFile) { - this.logFile = logFile; - } - - @Override - public String getConfFile() { - return confFile; - } - - @Override - public void setConfFile(String confFile) { - this.confFile = confFile; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobResourceManager.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobResourceManager.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobResourceManager.java deleted file mode 100644 index 139b29a..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobResourceManager.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs; - -import org.apache.ambari.view.ViewContext; -import org.apache.ambari.view.hive.client.*; -import org.apache.ambari.view.hive.persistence.utils.FilteringStrategy; -import org.apache.ambari.view.hive.persistence.utils.ItemNotFound; -import org.apache.ambari.view.hive.resources.PersonalCRUDResourceManager; -import org.apache.ambari.view.hive.utils.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; - -/** - * Object that provides CRUD operations for query objects - */ -public class JobResourceManager extends PersonalCRUDResourceManager<Job> { - private final static Logger LOG = - LoggerFactory.getLogger(JobResourceManager.class); - - private JobControllerFactory jobControllerFactory; - - /** - * Constructor - * @param context View Context instance - */ - public JobResourceManager(ViewContext context) { - super(JobImpl.class, context); - jobControllerFactory = JobControllerFactory.getInstance(context); - } - - @Override - public Job create(Job object) { - super.create(object); - JobController jobController = jobControllerFactory.createControllerForJob(object); - - try { - - jobController.afterCreation(); - saveIfModified(jobController); - - } catch (ServiceFormattedException e) { - cleanupAfterErrorAndThrowAgain(object, e); - } - - return object; - } - - private void saveIfModified(JobController jobController) { - if (jobController.isModified()) { - save(jobController.getJobPOJO()); - jobController.clearModified(); - } - } - - - @Override - public Job read(Integer id) throws ItemNotFound { - Job job = super.read(id); - JobController jobController = jobControllerFactory.createControllerForJob(job); - jobController.onRead(); - saveIfModified(jobController); - return job; - } - - @Override - public List<Job> readAll(FilteringStrategy filteringStrategy) { - return super.readAll(filteringStrategy); - } - - @Override - public void delete(Integer resourceId) throws ItemNotFound { - super.delete(resourceId); - } - - public JobController readController(Integer id) throws ItemNotFound { - Job job = read(id); - return jobControllerFactory.createControllerForJob(job); - } - - public Cursor getJobResultsCursor(Job job) { - try { - JobController jobController = jobControllerFactory.createControllerForJob(job); - return jobController.getResults(); - } catch (ItemNotFound itemNotFound) { - throw new NotFoundFormattedException("Job results are expired", null); - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobResourceProvider.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobResourceProvider.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobResourceProvider.java index 780921d..460278e 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobResourceProvider.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobResourceProvider.java @@ -22,6 +22,8 @@ import com.google.inject.Inject; import org.apache.ambari.view.*; import org.apache.ambari.view.hive.persistence.utils.ItemNotFound; import org.apache.ambari.view.hive.persistence.utils.OnlyOwnersFilteringStrategy; +import org.apache.ambari.view.hive.resources.jobs.viewJobs.*; +import org.apache.ambari.view.hive.utils.SharedObjectsFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +45,7 @@ public class JobResourceProvider implements ResourceProvider<Job> { protected synchronized JobResourceManager getResourceManager() { if (resourceManager == null) { - resourceManager = new JobResourceManager(context); + resourceManager = new JobResourceManager(new SharedObjectsFactory(context), context); } return resourceManager; } @@ -51,7 +53,7 @@ public class JobResourceProvider implements ResourceProvider<Job> { @Override public Job getResource(String resourceId, Set<String> properties) throws SystemException, NoSuchResourceException, UnsupportedPropertyException { try { - return getResourceManager().read(Integer.valueOf(resourceId)); + return getResourceManager().read(resourceId); } catch (ItemNotFound itemNotFound) { throw new NoSuchResourceException(resourceId); } @@ -74,7 +76,7 @@ public class JobResourceProvider implements ResourceProvider<Job> { throw new SystemException("error on creating resource", e); } getResourceManager().create(item); - JobController jobController = JobControllerFactory.getInstance(context).createControllerForJob(item); + JobController jobController = new SharedObjectsFactory(context).getJobControllerFactory().createControllerForJob(item); jobController.submit(); } @@ -89,7 +91,7 @@ public class JobResourceProvider implements ResourceProvider<Job> { throw new SystemException("error on updating resource", e); } try { - getResourceManager().update(item, Integer.valueOf(resourceId)); + getResourceManager().update(item, resourceId); } catch (ItemNotFound itemNotFound) { throw new NoSuchResourceException(resourceId); } @@ -99,7 +101,7 @@ public class JobResourceProvider implements ResourceProvider<Job> { @Override public boolean deleteResource(String resourceId) throws SystemException, NoSuchResourceException, UnsupportedPropertyException { try { - getResourceManager().delete(Integer.valueOf(resourceId)); + getResourceManager().delete(resourceId); } catch (ItemNotFound itemNotFound) { throw new NoSuchResourceException(resourceId); } http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java index 1c4f2a9..d9c69e8 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java @@ -24,7 +24,11 @@ import org.apache.ambari.view.hive.BaseService; import org.apache.ambari.view.hive.backgroundjobs.BackgroundJobController; import org.apache.ambari.view.hive.client.Cursor; import org.apache.ambari.view.hive.persistence.utils.ItemNotFound; -import org.apache.ambari.view.hive.persistence.utils.OnlyOwnersFilteringStrategy; +import org.apache.ambari.view.hive.resources.jobs.atsJobs.ATSRequestsDelegate; +import org.apache.ambari.view.hive.resources.jobs.atsJobs.ATSRequestsDelegateImpl; +import org.apache.ambari.view.hive.resources.jobs.atsJobs.ATSParser; +import org.apache.ambari.view.hive.resources.jobs.atsJobs.IATSParser; +import org.apache.ambari.view.hive.resources.jobs.viewJobs.*; import org.apache.ambari.view.hive.utils.*; import org.apache.ambari.view.hive.utils.HdfsApi; import org.apache.commons.beanutils.PropertyUtils; @@ -59,16 +63,25 @@ public class JobService extends BaseService { ViewResourceHandler handler; protected JobResourceManager resourceManager; + private IOperationHandleResourceManager opHandleResourceManager; protected final static Logger LOG = LoggerFactory.getLogger(JobService.class); protected synchronized JobResourceManager getResourceManager() { if (resourceManager == null) { - resourceManager = new JobResourceManager(context); + SharedObjectsFactory connectionsFactory = getSharedObjectsFactory(); + resourceManager = new JobResourceManager(connectionsFactory, context); } return resourceManager; } + private IOperationHandleResourceManager getOperationHandleResourceManager() { + if (opHandleResourceManager == null) { + opHandleResourceManager = new OperationHandleResourceManager(getSharedObjectsFactory()); + } + return opHandleResourceManager; + } + /** * Get single item */ @@ -77,7 +90,7 @@ public class JobService extends BaseService { @Produces(MediaType.APPLICATION_JSON) public Response getOne(@PathParam("jobId") String jobId) { try { - JobController jobController = getResourceManager().readController(Integer.valueOf(jobId)); + JobController jobController = getResourceManager().readController(jobId); JSONObject jsonJob = jsonObjectFromJob(jobController); @@ -110,7 +123,7 @@ public class JobService extends BaseService { @Context HttpServletResponse response, @QueryParam("columns") final String requestedColumns) { try { - JobController jobController = getResourceManager().readController(Integer.valueOf(jobId)); + JobController jobController = getResourceManager().readController(jobId); final Cursor resultSet = jobController.getResults(); resultSet.selectColumns(requestedColumns); @@ -153,7 +166,7 @@ public class JobService extends BaseService { @QueryParam("columns") final String requestedColumns, @Context HttpServletResponse response) { try { - final JobController jobController = getResourceManager().readController(Integer.valueOf(jobId)); + final JobController jobController = getResourceManager().readController(jobId); String backgroundJobId = "csv" + String.valueOf(jobController.getJob().getId()); if (commence != null && commence.equals("true")) { @@ -167,7 +180,7 @@ public class JobService extends BaseService { Cursor resultSet = jobController.getResults(); resultSet.selectColumns(requestedColumns); - FSDataOutputStream stream = HdfsApi.getInstance(context).create(targetFile, true); + FSDataOutputStream stream = getSharedObjectsFactory().getHdfsApi().create(targetFile, true); Writer writer = new BufferedWriter(new OutputStreamWriter(stream)); CSVPrinter csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT); try { @@ -225,7 +238,7 @@ public class JobService extends BaseService { @QueryParam("searchId") String searchId, @QueryParam("columns") final String requestedColumns) { try { - final JobController jobController = getResourceManager().readController(Integer.valueOf(jobId)); + final JobController jobController = getResourceManager().readController(jobId); return ResultsPaginationController.getInstance(context) .request(jobId, searchId, true, fromBeginning, count, @@ -276,13 +289,13 @@ public class JobService extends BaseService { try { JobController jobController; try { - jobController = getResourceManager().readController(Integer.valueOf(id)); + jobController = getResourceManager().readController(id); } catch (ItemNotFound itemNotFound) { throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound); } jobController.cancel(); if (remove != null && remove.compareTo("true") == 0) { - getResourceManager().delete(Integer.valueOf(id)); + getResourceManager().delete(id); } // getResourceManager().delete(Integer.valueOf(queryId)); return Response.status(204).build(); @@ -303,8 +316,10 @@ public class JobService extends BaseService { public Response getList() { try { LOG.debug("Getting all job"); - List allJobs = getResourceManager().readAll( - new OnlyOwnersFilteringStrategy(this.context.getUsername())); //TODO: move strategy to PersonalCRUDRM + ATSRequestsDelegate transport = new ATSRequestsDelegateImpl(context, "http://127.0.0.1:8188"); + IATSParser atsParser = new ATSParser(transport); + Aggregator aggregator = new Aggregator(getResourceManager(), getOperationHandleResourceManager(), atsParser); + List allJobs = aggregator.readAll(context.getUsername()); JSONObject object = new JSONObject(); object.put("jobs", allJobs); http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/LogParser.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/LogParser.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/LogParser.java index 090781c..3952491 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/LogParser.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/LogParser.java @@ -23,29 +23,35 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; public class LogParser { - public static final Pattern HADOOP_MR_JOBS_RE = Pattern.compile("(http[^\\s]*/proxy/([a-z0-9_]+?)/)"); - public static final Pattern HADOOP_TEZ_JOBS_RE = Pattern.compile("\\(Executing on YARN cluster with App id ([a-z0-9_]+?)\\)"); - private LinkedHashSet<JobId> jobsList; + public static final Pattern HADOOP_MR_APPS_RE = Pattern.compile("(http[^\\s]*/proxy/([a-z0-9_]+?)/)"); + public static final Pattern HADOOP_TEZ_APPS_RE = Pattern.compile("\\(Executing on YARN cluster with App id ([a-z0-9_]+?)\\)"); + private LinkedHashSet<AppId> appsList; + + private LogParser() {} public static LogParser parseLog(String logs) { LogParser parser = new LogParser(); - LinkedHashSet<JobId> mrJobIds = getMRJobIds(logs); - LinkedHashSet<JobId> tezJobIds = getTezJobIds(logs); + parser.setAppsList(parseApps(logs, parser)); + return parser; + } - LinkedHashSet<JobId> jobIds = new LinkedHashSet<JobId>(); - jobIds.addAll(mrJobIds); - jobIds.addAll(tezJobIds); + public static LinkedHashSet<AppId> parseApps(String logs, LogParser parser) { + LinkedHashSet<AppId> mrAppIds = getMRAppIds(logs); + LinkedHashSet<AppId> tezAppIds = getTezAppIds(logs); - parser.setJobsList(jobIds); - return parser; + LinkedHashSet<AppId> appIds = new LinkedHashSet<AppId>(); + appIds.addAll(mrAppIds); + appIds.addAll(tezAppIds); + + return appIds; } - private static LinkedHashSet<JobId> getMRJobIds(String logs) { - Matcher m = HADOOP_MR_JOBS_RE.matcher(logs); - LinkedHashSet<JobId> list = new LinkedHashSet<JobId>(); + private static LinkedHashSet<AppId> getMRAppIds(String logs) { + Matcher m = HADOOP_MR_APPS_RE.matcher(logs); + LinkedHashSet<AppId> list = new LinkedHashSet<AppId>(); while (m.find()) { - JobId applicationInfo = new JobId(); + AppId applicationInfo = new AppId(); applicationInfo.setTrackingUrl(m.group(1)); applicationInfo.setIdentifier(m.group(2)); list.add(applicationInfo); @@ -53,27 +59,34 @@ public class LogParser { return list; } - private static LinkedHashSet<JobId> getTezJobIds(String logs) { - Matcher m = HADOOP_TEZ_JOBS_RE.matcher(logs); - LinkedHashSet<JobId> list = new LinkedHashSet<JobId>(); + private static LinkedHashSet<AppId> getTezAppIds(String logs) { + Matcher m = HADOOP_TEZ_APPS_RE.matcher(logs); + LinkedHashSet<AppId> list = new LinkedHashSet<AppId>(); while (m.find()) { - JobId applicationInfo = new JobId(); - applicationInfo.setTrackingUrl(null); + AppId applicationInfo = new AppId(); + applicationInfo.setTrackingUrl(""); applicationInfo.setIdentifier(m.group(1)); list.add(applicationInfo); } return list; } - public void setJobsList(LinkedHashSet<JobId> jobsList) { - this.jobsList = jobsList; + public void setAppsList(LinkedHashSet<AppId> appsList) { + this.appsList = appsList; + } + + public LinkedHashSet<AppId> getAppsList() { + return appsList; } - public LinkedHashSet<JobId> getJobsList() { - return jobsList; + public AppId getLastAppInList() { + Object[] appIds = appsList.toArray(); + if (appIds.length == 0) + return null; + return (AppId) appIds[appsList.size()-1]; } - public static class JobId { + public static class AppId { private String trackingUrl; private String identifier; @@ -96,11 +109,11 @@ public class LogParser { @Override public boolean equals(Object o) { if (this == o) return true; - if (!(o instanceof JobId)) return false; + if (!(o instanceof AppId)) return false; - JobId jobId = (JobId) o; + AppId appId = (AppId) o; - if (!identifier.equals(jobId.identifier)) return false; + if (!identifier.equals(appId.identifier)) return false; return true; } @@ -110,4 +123,16 @@ public class LogParser { return identifier.hashCode(); } } + + public static class EmptyAppId extends AppId { + @Override + public String getTrackingUrl() { + return ""; + } + + @Override + public String getIdentifier() { + return ""; + } + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleController.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleController.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleController.java index 551ebdd..e146d55 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleController.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleController.java @@ -19,10 +19,10 @@ package org.apache.ambari.view.hive.resources.jobs; -import org.apache.ambari.view.ViewContext; -import org.apache.ambari.view.hive.client.ConnectionPool; import org.apache.ambari.view.hive.client.Cursor; import org.apache.ambari.view.hive.client.HiveClientException; +import org.apache.ambari.view.hive.client.IConnectionFactory; +import org.apache.ambari.view.hive.resources.jobs.viewJobs.Job; import org.apache.ambari.view.hive.utils.ServiceFormattedException; import org.apache.hive.service.cli.thrift.TGetOperationStatusResp; import org.apache.hive.service.cli.thrift.TOperationHandle; @@ -33,18 +33,16 @@ public class OperationHandleController { private final static Logger LOG = LoggerFactory.getLogger(OperationHandleController.class); - private ViewContext context; + private IConnectionFactory connectionsFabric; private TOperationHandle operationHandle; - private OperationHandleResourceManager operationHandlesStorage; + private IOperationHandleResourceManager operationHandlesStorage; - public OperationHandleController(ViewContext context, TOperationHandle storedOperationHandle, OperationHandleResourceManager operationHandlesStorage) { - this.context = context; + public OperationHandleController(IConnectionFactory connectionsFabric, TOperationHandle storedOperationHandle, IOperationHandleResourceManager operationHandlesStorage) { + this.connectionsFabric = connectionsFabric; this.operationHandle = storedOperationHandle; this.operationHandlesStorage = operationHandlesStorage; } - - public TOperationHandle getStoredOperationHandle() { return operationHandle; } @@ -54,7 +52,7 @@ public class OperationHandleController { } public String getOperationStatus() throws NoOperationStatusSetException, HiveClientException { - TGetOperationStatusResp statusResp = ConnectionPool.getConnection(context).getOperationStatus(operationHandle); + TGetOperationStatusResp statusResp = connectionsFabric.getHiveConnection().getOperationStatus(operationHandle); if (!statusResp.isSetOperationState()) { throw new NoOperationStatusSetException("Operation state is not set"); } @@ -93,7 +91,7 @@ public class OperationHandleController { public void cancel() { try { - ConnectionPool.getConnection(context).cancelOperation(operationHandle); + connectionsFabric.getHiveConnection().cancelOperation(operationHandle); } catch (HiveClientException e) { throw new ServiceFormattedException("Cancel failed: " + e.toString(), e); } @@ -104,10 +102,10 @@ public class OperationHandleController { } public String getLogs() { - return ConnectionPool.getConnection(context).getLogs(operationHandle); + return connectionsFabric.getHiveConnection().getLogs(operationHandle); } public Cursor getResults() { - return ConnectionPool.getConnection(context).getResults(operationHandle); + return connectionsFabric.getHiveConnection().getResults(operationHandle); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleControllerFactory.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleControllerFactory.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleControllerFactory.java index 5d4a8af..0310855 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleControllerFactory.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleControllerFactory.java @@ -18,31 +18,22 @@ package org.apache.ambari.view.hive.resources.jobs; -import org.apache.ambari.view.ViewContext; import org.apache.ambari.view.hive.persistence.utils.ItemNotFound; +import org.apache.ambari.view.hive.resources.jobs.viewJobs.Job; +import org.apache.ambari.view.hive.utils.SharedObjectsFactory; import org.apache.hive.service.cli.thrift.TOperationHandle; -import java.util.HashMap; -import java.util.Map; - public class OperationHandleControllerFactory { - private ViewContext context; - private OperationHandleResourceManager operationHandlesStorage; - - private OperationHandleControllerFactory(ViewContext context) { - this.context = context; - operationHandlesStorage = new OperationHandleResourceManager(context); - } + private SharedObjectsFactory connectionsFabric; + private IOperationHandleResourceManager operationHandlesStorage; - private static Map<String, OperationHandleControllerFactory> viewSingletonObjects = new HashMap<String, OperationHandleControllerFactory>(); - public static OperationHandleControllerFactory getInstance(ViewContext context) { - if (!viewSingletonObjects.containsKey(context.getInstanceName())) - viewSingletonObjects.put(context.getInstanceName(), new OperationHandleControllerFactory(context)); - return viewSingletonObjects.get(context.getInstanceName()); + public OperationHandleControllerFactory(SharedObjectsFactory connectionsFabric) { + this.connectionsFabric = connectionsFabric; + operationHandlesStorage = new OperationHandleResourceManager(connectionsFabric); } public OperationHandleController createControllerForHandle(TOperationHandle storedOperationHandle) { - return new OperationHandleController(context, storedOperationHandle, operationHandlesStorage); + return new OperationHandleController(connectionsFabric, storedOperationHandle, operationHandlesStorage); } public OperationHandleController getHandleForJob(Job job) throws ItemNotFound { http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleResourceManager.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleResourceManager.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleResourceManager.java index cffed38..5004677 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleResourceManager.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleResourceManager.java @@ -18,42 +18,43 @@ package org.apache.ambari.view.hive.resources.jobs; -import org.apache.ambari.view.ViewContext; +import org.apache.ambari.view.hive.persistence.IStorageFactory; import org.apache.ambari.view.hive.persistence.utils.FilteringStrategy; import org.apache.ambari.view.hive.persistence.utils.Indexed; import org.apache.ambari.view.hive.persistence.utils.ItemNotFound; import org.apache.ambari.view.hive.resources.SharedCRUDResourceManager; +import org.apache.ambari.view.hive.resources.jobs.viewJobs.Job; import org.apache.ambari.view.hive.utils.ServiceFormattedException; import org.apache.hive.service.cli.thrift.TOperationHandle; -import sun.reflect.generics.reflectiveObjects.NotImplementedException; import java.util.List; -public class OperationHandleResourceManager extends SharedCRUDResourceManager<StoredOperationHandle> { +public class OperationHandleResourceManager extends SharedCRUDResourceManager<StoredOperationHandle> + implements IOperationHandleResourceManager { /** * Constructor - * - * @param context View Context instance */ - public OperationHandleResourceManager(ViewContext context) { - super(StoredOperationHandle.class, context); + public OperationHandleResourceManager(IStorageFactory storageFabric) { + super(StoredOperationHandle.class, storageFabric); } + @Override public List<StoredOperationHandle> readJobRelatedHandles(final Job job) { - try { - return getStorage().loadWhere(StoredOperationHandle.class, "jobId = " + job.getId()); - } catch (NotImplementedException e) { - // fallback to filtering strategy - return getStorage().loadAll(StoredOperationHandle.class, new FilteringStrategy() { - @Override - public boolean isConform(Indexed item) { - StoredOperationHandle handle = (StoredOperationHandle) item; - return (handle.getJobId() != null && handle.getJobId().equals(job.getId())); - } - }); - } + return storageFabric.getStorage().loadAll(StoredOperationHandle.class, new FilteringStrategy() { + @Override + public boolean isConform(Indexed item) { + StoredOperationHandle handle = (StoredOperationHandle) item; + return (handle.getJobId() != null && handle.getJobId().equals(job.getId())); + } + + @Override + public String whereStatement() { + return "jobId = '" + job.getId() + "'"; + } + }); } + @Override public void putHandleForJob(TOperationHandle h, Job job) { StoredOperationHandle handle = StoredOperationHandle.buildFromTOperationHandle(h); handle.setJobId(job.getId()); @@ -71,11 +72,13 @@ public class OperationHandleResourceManager extends SharedCRUDResourceManager<St } } + @Override public boolean containsHandleForJob(Job job) { List<StoredOperationHandle> jobRelatedHandles = readJobRelatedHandles(job); return jobRelatedHandles.size() > 0; } + @Override public TOperationHandle getHandleForJob(Job job) throws ItemNotFound { List<StoredOperationHandle> jobRelatedHandles = readJobRelatedHandles(job); if (jobRelatedHandles.size() == 0) http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/StoredOperationHandle.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/StoredOperationHandle.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/StoredOperationHandle.java index 193b226..1d3f6e0 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/StoredOperationHandle.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/StoredOperationHandle.java @@ -40,9 +40,9 @@ public class StoredOperationHandle implements Indexed { private String guid; private String secret; - private Integer jobId; + private String jobId; - private Integer id; + private String id; public StoredOperationHandle() {} public StoredOperationHandle(Map<String, Object> stringObjectMap) throws InvocationTargetException, IllegalAccessException { @@ -126,21 +126,21 @@ public class StoredOperationHandle implements Indexed { this.secret = secret; } - public Integer getJobId() { + public String getJobId() { return jobId; } - public void setJobId(Integer jobId) { + public void setJobId(String jobId) { this.jobId = jobId; } @Override - public Integer getId() { + public String getId() { return id; } @Override - public void setId(Integer id) { + public void setId(String id) { this.id = id; } } http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParser.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParser.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParser.java new file mode 100644 index 0000000..b644d4c --- /dev/null +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParser.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive.resources.jobs.atsJobs; + +import org.apache.ambari.view.hive.utils.ServiceFormattedException; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.List; + +public class ATSParser implements IATSParser { + protected final static Logger LOG = + LoggerFactory.getLogger(ATSParser.class); + + private ATSRequestsDelegate delegate; + + private static final long MillisInSecond = 1000L; + + public ATSParser(ATSRequestsDelegate delegate) { + this.delegate = delegate; + } + + @Override + public List<HiveQueryId> getHiveQuieryIdsList(String username) { + JSONObject entities = delegate.hiveQueryIdList(username); + JSONArray jobs = (JSONArray) entities.get("entities"); + + List<HiveQueryId> parsedJobs = new LinkedList<HiveQueryId>(); + for(Object job : jobs) { + try { + HiveQueryId parsedJob = parseAtsHiveJob((JSONObject) job); + parsedJobs.add(parsedJob); + } catch (Exception ex) { + LOG.error("Error while parsing ATS job", ex); + } + } + + return parsedJobs; + } + + @Override + public HiveQueryId getHiveQuieryIdByOperationId(byte[] guid) { + String guidString = new String(guid); + JSONObject entities = delegate.hiveQueryIdByOperationId(guidString); + JSONArray jobs = (JSONArray) entities.get("entities"); + + assert jobs.size() <= 1; + if (jobs.size() == 0) { + //TODO: throw appropriate exception + throw new ServiceFormattedException("HIVE_QUERY_ID with operationid=" + guidString + " not found"); + } + + return parseAtsHiveJob((JSONObject) jobs.get(0)); + } + + @Override + public TezDagId getTezDAGByName(String name) { + JSONArray tezDagEntities = (JSONArray) delegate.tezDagByName(name).get("entities"); + assert tezDagEntities.size() <= 1; + if (tezDagEntities.size() == 0) { + return new TezDagId(); + } + JSONObject tezDagEntity = (JSONObject) tezDagEntities.get(0); + + TezDagId parsedDag = new TezDagId(); + JSONArray applicationIds = (JSONArray) ((JSONObject) tezDagEntity.get("primaryfilters")).get("applicationId"); + parsedDag.applicationId = (String) applicationIds.get(0); + parsedDag.status = (String) ((JSONObject) tezDagEntity.get("otherinfo")).get("status"); + return parsedDag; + } + + private HiveQueryId parseAtsHiveJob(JSONObject job) { + HiveQueryId parsedJob = new HiveQueryId(); + + parsedJob.entity = (String) job.get("entity"); + parsedJob.starttime = ((Long) job.get("starttime")) / MillisInSecond; + + JSONObject primaryfilters = (JSONObject) job.get("primaryfilters"); + JSONArray operationIds = (JSONArray) primaryfilters.get("operationid"); + if (operationIds != null) { + parsedJob.operationId = (String) (operationIds).get(0); + } + JSONArray users = (JSONArray) primaryfilters.get("user"); + if (users != null) { + parsedJob.user = (String) (users).get(0); + } + + JSONObject lastEvent = getLastEvent(job); + long lastEventTimestamp = ((Long) lastEvent.get("timestamp")) / MillisInSecond; + + parsedJob.duration = lastEventTimestamp - parsedJob.starttime; + + JSONObject otherinfo = (JSONObject) job.get("otherinfo"); + JSONObject query = (JSONObject) JSONValue.parse((String) otherinfo.get("QUERY")); + + parsedJob.query = (String) query.get("queryText"); + JSONObject stages = (JSONObject) ((JSONObject) query.get("queryPlan")).get("STAGE PLANS"); + + List<String> dagIds = new LinkedList<String>(); + List<JSONObject> stagesList = new LinkedList<JSONObject>(); + + for (Object key : stages.keySet()) { + JSONObject stage = (JSONObject) stages.get(key); + if (stage.get("Tez") != null) { + String dagId = (String) ((JSONObject) stage.get("Tez")).get("DagName:"); + dagIds.add(dagId); + } + stagesList.add(stage); + } + parsedJob.dagNames = dagIds; + parsedJob.stages = stagesList; + return parsedJob; + } + + private JSONObject getLastEvent(JSONObject atsEntity) { + JSONArray events = (JSONArray) atsEntity.get("events"); + return (JSONObject) events.get(0); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParserFactory.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParserFactory.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParserFactory.java new file mode 100644 index 0000000..f5e9bcf --- /dev/null +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParserFactory.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive.resources.jobs.atsJobs; + +import org.apache.ambari.view.ViewContext; + +import java.util.HashMap; +import java.util.Map; + +public class ATSParserFactory { + + private ViewContext context; + + public ATSParserFactory(ViewContext context) { + this.context = context; + } + + public ATSParser getATSParser() { + ATSRequestsDelegateImpl delegate = new ATSRequestsDelegateImpl(context, getATSUrl(context)); + return new ATSParser(delegate); + } + + public static String getATSUrl(ViewContext context) { + return context.getProperties().get("yarn.ats.url"); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegate.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegate.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegate.java new file mode 100644 index 0000000..3aa07d4 --- /dev/null +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegate.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive.resources.jobs.atsJobs; + +import org.json.simple.JSONObject; + +public interface ATSRequestsDelegate { + JSONObject hiveQueryIdList(String username); + + JSONObject hiveQueryIdByOperationId(String operationId); + + JSONObject tezDagByName(String name); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegateImpl.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegateImpl.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegateImpl.java new file mode 100644 index 0000000..047bd63 --- /dev/null +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegateImpl.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive.resources.jobs.atsJobs; + +import org.apache.ambari.view.ViewContext; +import org.apache.commons.io.IOUtils; +import org.json.simple.JSONObject; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; + +public class ATSRequestsDelegateImpl implements ATSRequestsDelegate { + protected final static Logger LOG = + LoggerFactory.getLogger(ATSRequestsDelegateImpl.class); + public static final String EMPTY_ENTITIES_JSON = "{ \"entities\" : [ ] }"; + + private ViewContext context; + private String atsUrl; + + public ATSRequestsDelegateImpl(ViewContext context, String atsUrl) { + this.context = context; + this.atsUrl = atsUrl; + } + + @Override + public JSONObject hiveQueryIdList(String username) { + String hiveQueriesListUrl = atsUrl + "/ws/v1/timeline/HIVE_QUERY_ID?primaryFilter=requestuser:" + username; + String response = readFromWithDefault(hiveQueriesListUrl, "{ \"entities\" : [ ] }"); + return (JSONObject) JSONValue.parse(response); + } + + @Override + public JSONObject hiveQueryIdByOperationId(String operationId) { + String hiveQueriesListUrl = atsUrl + "/ws/v1/timeline/HIVE_QUERY_ID?primaryFilter=operationid:" + operationId; + String response = readFromWithDefault(hiveQueriesListUrl, "{ \"entities\" : [ ] }"); + return (JSONObject) JSONValue.parse(response); + } + + @Override + public JSONObject tezDagByName(String name) { + String tezDagUrl = atsUrl + "/ws/v1/timeline/TEZ_DAG_ID?primaryFilter=dagName:" + name; + String response = readFromWithDefault(tezDagUrl, EMPTY_ENTITIES_JSON); + return (JSONObject) JSONValue.parse(response); + } + + protected String readFromWithDefault(String hiveQueriesListUrl, String defaultResponse) { + String response; + try { + InputStream responseInputStream = context.getURLStreamProvider().readFrom(hiveQueriesListUrl, "GET", + null, new HashMap<String, String>()); + response = IOUtils.toString(responseInputStream); + } catch (IOException e) { + LOG.error("Error while reading from ATS", e); + response = defaultResponse; + } + return response; + } + + public String getAtsUrl() { + return atsUrl; + } + + public void setAtsUrl(String atsUrl) { + this.atsUrl = atsUrl; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/HiveQueryId.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/HiveQueryId.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/HiveQueryId.java new file mode 100644 index 0000000..edb726b --- /dev/null +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/HiveQueryId.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive.resources.jobs.atsJobs; + +import org.json.simple.JSONObject; + +import java.util.List; + +public class HiveQueryId { + public String entity; + public String query; + + public List<String> dagNames; + + public List<JSONObject> stages; + + public long starttime; + public long duration; + public String operationId; + public String user; +} http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/IATSParser.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/IATSParser.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/IATSParser.java new file mode 100644 index 0000000..d029fdc --- /dev/null +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/IATSParser.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive.resources.jobs.atsJobs; + +import java.util.List; + +public interface IATSParser { + List<HiveQueryId> getHiveQuieryIdsList(String username); + + HiveQueryId getHiveQuieryIdByOperationId(byte[] guid); + + TezDagId getTezDAGByName(String name); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/TezDagId.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/TezDagId.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/TezDagId.java new file mode 100644 index 0000000..061c51c --- /dev/null +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/TezDagId.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive.resources.jobs.atsJobs; + +public class TezDagId { + public static final String STATUS_UNKNOWN = "UNKNOWN"; + public String applicationId = ""; + public String dagName = ""; + public String status = STATUS_UNKNOWN; +} http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/IJobControllerFactory.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/IJobControllerFactory.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/IJobControllerFactory.java new file mode 100644 index 0000000..89fbb85 --- /dev/null +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/IJobControllerFactory.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive.resources.jobs.viewJobs; + +public interface IJobControllerFactory { + JobController createControllerForJob(Job job); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/Job.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/Job.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/Job.java new file mode 100644 index 0000000..004932c --- /dev/null +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/Job.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive.resources.jobs.viewJobs; + + +import org.apache.ambari.view.hive.persistence.utils.Indexed; +import org.apache.ambari.view.hive.persistence.utils.PersonalResource; + +import java.io.Serializable; + +/** + * Interface for Job bean to create Proxy for it + */ +public interface Job extends Serializable,Indexed,PersonalResource { + public static final String JOB_STATE_UNKNOWN = "Unknown"; + public static final String JOB_STATE_INITIALIZED = "Initialized"; + public static final String JOB_STATE_RUNNING = "Running"; + public static final String JOB_STATE_FINISHED = "Succeeded"; + public static final String JOB_STATE_CANCELED = "Canceled"; + public static final String JOB_STATE_CLOSED = "Closed"; + public static final String JOB_STATE_ERROR = "Error"; + public static final String JOB_STATE_PENDING = "Pending"; + + String getId(); + + void setId(String id); + + String getOwner(); + + void setOwner(String owner); + + String getTitle(); + + void setTitle(String title); + + String getQueryFile(); + + void setQueryFile(String queryFile); + + Long getDateSubmitted(); + + void setDateSubmitted(Long dateSubmitted); + + Long getDuration(); + + void setDuration(Long duration); + + String getStatus(); + + void setStatus(String status); + + String getForcedContent(); + + void setForcedContent(String forcedContent); + + String getQueryId(); + + void setQueryId(String queryId); + + String getStatusDir(); + + void setStatusDir(String statusDir); + + String getDataBase(); + + void setDataBase(String dataBase); + + String getLogFile(); + + void setLogFile(String logFile); + + String getConfFile(); + + void setConfFile(String confFile); + + String getApplicationId(); + + void setApplicationId(String applicationId); + + String getDagName(); + + void setDagName(String DagName); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobController.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobController.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobController.java new file mode 100644 index 0000000..339e194 --- /dev/null +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobController.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive.resources.jobs.viewJobs; + +import org.apache.ambari.view.hive.client.Cursor; +import org.apache.ambari.view.hive.persistence.utils.ItemNotFound; + +public interface JobController { + void submit(); + + void cancel() throws ItemNotFound; + + Job getJob(); + + /** + * Use carefully. Returns unproxied bean object + * @return unproxied bean object + */ + Job getJobPOJO(); + + Cursor getResults() throws ItemNotFound; + + void afterCreation(); + + void update(); + + boolean isModified(); + + void clearModified(); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobControllerFactory.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobControllerFactory.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobControllerFactory.java new file mode 100644 index 0000000..12d1cdb --- /dev/null +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobControllerFactory.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive.resources.jobs.viewJobs; + +import org.apache.ambari.view.ViewContext; +import org.apache.ambari.view.hive.utils.SharedObjectsFactory; + +public class JobControllerFactory implements IJobControllerFactory { + private SharedObjectsFactory sharedObjectsFactory; + private ViewContext context; + + public JobControllerFactory(ViewContext context, SharedObjectsFactory sharedObjectsFactory) { + this.sharedObjectsFactory = sharedObjectsFactory; + this.context = context; + } + + @Override + public JobController createControllerForJob(Job job) { + return new JobControllerImpl(context, job, + sharedObjectsFactory.getHiveConnectionController(), + sharedObjectsFactory.getOperationHandleControllerFactory(), + sharedObjectsFactory.getSavedQueryResourceManager(), + sharedObjectsFactory.getATSParser(), + sharedObjectsFactory.getHdfsApi()); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobControllerImpl.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobControllerImpl.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobControllerImpl.java new file mode 100644 index 0000000..a100f3d --- /dev/null +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobControllerImpl.java @@ -0,0 +1,343 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive.resources.jobs.viewJobs; + +import org.apache.ambari.view.ViewContext; +import org.apache.ambari.view.hive.client.*; +import org.apache.ambari.view.hive.persistence.utils.ItemNotFound; +import org.apache.ambari.view.hive.resources.jobs.*; +import org.apache.ambari.view.hive.resources.jobs.atsJobs.IATSParser; +import org.apache.ambari.view.hive.resources.savedQueries.SavedQuery; +import org.apache.ambari.view.hive.resources.savedQueries.SavedQueryResourceManager; +import org.apache.ambari.view.hive.utils.*; +import org.apache.ambari.view.hive.utils.HdfsApi; +import org.apache.ambari.view.hive.utils.HdfsUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.Proxy; +import java.text.SimpleDateFormat; +import java.util.*; + +public class JobControllerImpl implements JobController, ModifyNotificationDelegate { + private final static Logger LOG = + LoggerFactory.getLogger(JobControllerImpl.class); + + private ViewContext context; + private HdfsApi hdfsApi; + private Job jobUnproxied; + private Job job; + private boolean modified; + + private OperationHandleControllerFactory opHandleControllerFactory; + private ConnectionController hiveSession; + private SavedQueryResourceManager savedQueryResourceManager; + private IATSParser atsParser; + + /** + * JobController constructor + * Warning: Create JobControllers ONLY using JobControllerFactory! + */ + public JobControllerImpl(ViewContext context, Job job, + ConnectionController hiveSession, + OperationHandleControllerFactory opHandleControllerFactory, + SavedQueryResourceManager savedQueryResourceManager, + IATSParser atsParser, + HdfsApi hdfsApi) { + this.context = context; + setJobPOJO(job); + this.opHandleControllerFactory = opHandleControllerFactory; + this.hiveSession = hiveSession; + this.savedQueryResourceManager = savedQueryResourceManager; + this.atsParser = atsParser; + this.hdfsApi = hdfsApi; + } + + public String getQueryForJob() { + FilePaginator paginator = new FilePaginator(job.getQueryFile(), hdfsApi); + String query; + try { + query = paginator.readPage(0); //warning - reading only 0 page restricts size of query to 1MB + } catch (IOException e) { + throw new ServiceFormattedException("Error when reading file: " + e.toString(), e); + } catch (InterruptedException e) { + throw new ServiceFormattedException("Error when reading file: " + e.toString(), e); + } + return query; + } + + private static final String DEFAULT_DB = "default"; + public String getJobDatabase() { + if (job.getDataBase() != null) { + return job.getDataBase(); + } else { + return DEFAULT_DB; + } + } + + @Override + public void submit() { + setupHiveBeforeQueryExecute(); + + String query = getQueryForJob(); + OperationHandleController handleController = hiveSession.executeQuery(query); + + handleController.persistHandleForJob(job); + +// atsParser.getHiveQuieryIdsList() + } + + private void setupHiveBeforeQueryExecute() { + String database = getJobDatabase(); + hiveSession.selectDatabase(database); + } + + @Override + public void cancel() throws ItemNotFound { + OperationHandleController handle = opHandleControllerFactory.getHandleForJob(job); + handle.cancel(); + } + + @Override + public void update() { + updateOperationStatus(); + updateOperationLogs(); + + updateJobDuration(); + } + + public void updateOperationStatus() { + try { + + OperationHandleController handle = opHandleControllerFactory.getHandleForJob(job); + String status = handle.getOperationStatus(); + job.setStatus(status); + LOG.debug("Status of job#" + job.getId() + " is " + job.getStatus()); + + } catch (NoOperationStatusSetException e) { + LOG.info("Operation state is not set for job#" + job.getId()); + + } catch (HiveErrorStatusException e) { + LOG.debug("Error updating status for job#" + job.getId() + ": " + e.getMessage()); + job.setStatus(Job.JOB_STATE_UNKNOWN); + + } catch (HiveClientException e) { + throw new ServiceFormattedException("Could not fetch job status " + job.getId(), e); + + } catch (ItemNotFound itemNotFound) { + LOG.debug("No TOperationHandle for job#" + job.getId() + ", can't update status"); + } + } + + public void updateOperationLogs() { + try { + OperationHandleController handle = opHandleControllerFactory.getHandleForJob(job); + String logs = handle.getLogs(); + + LogParser info = LogParser.parseLog(logs); + LogParser.AppId app = info.getLastAppInList(); + if (app != null) { + job.setApplicationId(app.getIdentifier()); + } + + String logFilePath = job.getLogFile(); + HdfsUtil.putStringToFile(hdfsApi, logFilePath, logs); + + } catch (HiveClientRuntimeException ex) { + LOG.error("Error while fetching logs: " + ex.getMessage()); + } catch (ItemNotFound itemNotFound) { + LOG.debug("No TOperationHandle for job#" + job.getId() + ", can't read logs"); + } + } + + public boolean isJobEnded() { + String status = job.getStatus(); + return status.equals(Job.JOB_STATE_FINISHED) || status.equals(Job.JOB_STATE_CANCELED) || + status.equals(Job.JOB_STATE_CLOSED) || status.equals(Job.JOB_STATE_ERROR) || + status.equals(Job.JOB_STATE_UNKNOWN); // Unknown is not finished, but polling makes no sense + } + + @Override + public Job getJob() { + return job; + } + + /** + * Use carefully. Returns unproxied bean object + * @return unproxied bean object + */ + @Override + public Job getJobPOJO() { + return jobUnproxied; + } + + public void setJobPOJO(Job jobPOJO) { + Job jobModifyNotificationProxy = (Job) Proxy.newProxyInstance(jobPOJO.getClass().getClassLoader(), + new Class[]{Job.class}, + new ModifyNotificationInvocationHandler(jobPOJO, this)); + this.job = jobModifyNotificationProxy; + + this.jobUnproxied = jobPOJO; + } + + @Override + public Cursor getResults() throws ItemNotFound { + OperationHandleController handle = opHandleControllerFactory.getHandleForJob(job); + return handle.getResults(); + } + + @Override + public void afterCreation() { + setupStatusDirIfNotPresent(); + setupQueryFileIfNotPresent(); + setupLogFileIfNotPresent(); + + setCreationDate(); + } + + public void setupLogFileIfNotPresent() { + if (job.getLogFile() == null || job.getLogFile().isEmpty()) { + setupLogFile(); + } + } + + public void setupQueryFileIfNotPresent() { + if (job.getQueryFile() == null || job.getQueryFile().isEmpty()) { + setupQueryFile(); + } + } + + public void setupStatusDirIfNotPresent() { + if (job.getStatusDir() == null || job.getStatusDir().isEmpty()) { + setupStatusDir(); + } + } + + private static final long MillisInSecond = 1000L; + + public void updateJobDuration() { + job.setDuration(System.currentTimeMillis() / MillisInSecond - job.getDateSubmitted()); + } + + public void setCreationDate() { + job.setDateSubmitted(System.currentTimeMillis() / MillisInSecond); + } + + + + private void setupLogFile() { + LOG.debug("Creating log file for job#" + job.getId()); + + String logFile = job.getStatusDir() + "/" + "logs"; + HdfsUtil.putStringToFile(hdfsApi, logFile, ""); + + job.setLogFile(logFile); + LOG.debug("Log file for job#" + job.getId() + ": " + logFile); + } + + private void setupStatusDir() { + String newDirPrefix = makeStatusDirectoryPrefix(); + String newDir = HdfsUtil.findUnallocatedFileName(hdfsApi, newDirPrefix, ""); + + job.setStatusDir(newDir); + LOG.debug("Status dir for job#" + job.getId() + ": " + newDir); + } + + private String makeStatusDirectoryPrefix() { + String userScriptsPath = context.getProperties().get("jobs.dir"); + + if (userScriptsPath == null) { // TODO: move check to initialization code + String msg = "jobs.dir is not configured!"; + LOG.error(msg); + throw new MisconfigurationFormattedException("jobs.dir"); + } + + String normalizedName = String.format("hive-job-%s", job.getId()); + String timestamp = new SimpleDateFormat("yyyy-MM-dd_hh-mm").format(new Date()); + return String.format(userScriptsPath + + "/%s-%s", normalizedName, timestamp); + } + + private void setupQueryFile() { + String statusDir = job.getStatusDir(); + assert statusDir != null : "setupStatusDir() should be called first"; + + String jobQueryFilePath = statusDir + "/" + "query.hql"; + + try { + + if (job.getForcedContent() != null) { + + HdfsUtil.putStringToFile(hdfsApi, jobQueryFilePath, job.getForcedContent()); + job.setForcedContent(""); // prevent forcedContent to be written to DB + + } + else if (job.getQueryId() != null) { + + String savedQueryFile = getRelatedSavedQueryFile(); + hdfsApi.copy(savedQueryFile, jobQueryFilePath); + job.setQueryFile(jobQueryFilePath); + + } else { + + throw new BadRequestFormattedException("queryId or forcedContent should be passed!", null); + + } + + } catch (IOException e) { + throw new ServiceFormattedException("Error in creation: " + e.toString(), e); + } catch (InterruptedException e) { + throw new ServiceFormattedException("Error in creation: " + e.toString(), e); + } + job.setQueryFile(jobQueryFilePath); + + LOG.debug("Query file for job#" + job.getId() + ": " + jobQueryFilePath); + } + + private String getRelatedSavedQueryFile() { + SavedQuery savedQuery; + try { + savedQuery = savedQueryResourceManager.read(job.getQueryId()); + } catch (ItemNotFound itemNotFound) { + throw new BadRequestFormattedException("queryId not found!", itemNotFound); + } + return savedQuery.getQueryFile(); + } + + @Override + public boolean onModification(Object object) { + setModified(true); + return true; + } + + @Override + public boolean isModified() { + return modified; + } + + public void setModified(boolean modified) { + this.modified = modified; + } + + @Override + public void clearModified() { + setModified(false); + } +}