OOZIE-1813 Add service to report/kill rogue bundles and coordinator jobs
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/1122898e Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/1122898e Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/1122898e Branch: refs/remotes/trunk Commit: 1122898e51e3e09cd0399565d5cfc15c59fa9ead Parents: e7c642c Author: Purshotam Shah <[email protected]> Authored: Wed Sep 10 12:04:52 2014 -0700 Committer: Purshotam Shah <[email protected]> Committed: Wed Sep 10 12:04:52 2014 -0700 ---------------------------------------------------------------------- .../org/apache/oozie/CoordinatorJobBean.java | 7 +- .../oozie/action/email/EmailActionExecutor.java | 4 +- .../apache/oozie/command/wf/JobXCommand.java | 2 +- .../executor/jpa/CoordJobQueryExecutor.java | 17 +- .../service/AbandonedCoordCheckerService.java | 202 +++++++++++++++++++ core/src/main/resources/oozie-default.xml | 36 ++++ .../coord/TestAbandonedCoordChecker.java | 183 +++++++++++++++++ release-log.txt | 1 + 8 files changed, 447 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/1122898e/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java index 14fd74c..3dc7450 100644 --- a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java +++ b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java @@ -32,6 +32,8 @@ import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.Id; import javax.persistence.Lob; +import javax.persistence.NamedNativeQueries; +import javax.persistence.NamedNativeQuery; import javax.persistence.NamedQueries; import javax.persistence.NamedQuery; import javax.persistence.Table; @@ -129,7 +131,10 @@ import org.json.simple.JSONObject; @NamedQuery(name = "GET_COORD_JOB_FOR_USER", query = "select w.user from CoordinatorJobBean w where w.id = :id"), - @NamedQuery(name = "GET_COORD_JOB_STATUS_PARENTID", query = "select w.statusStr, w.bundleId from CoordinatorJobBean w where w.id = :id") + @NamedQuery(name = "GET_COORD_JOB_STATUS_PARENTID", query = "select w.statusStr, w.bundleId from CoordinatorJobBean w where w.id = :id")}) + +@NamedNativeQueries({ + @NamedNativeQuery(name = "GET_COORD_FOR_ABANDONEDCHECK", query = "select w.id, w.USER_NAME, w.group_name, w.APP_NAME from coord_jobs w where ( w.STATUS = 'RUNNING' or w.STATUS = 'RUNNINGWITHERROR' ) and w.start_time < ?2 and w.id in (select failedJobs.job_id from (select a.job_id from coord_actions a where ( a.STATUS = 'FAILED' or a.STATUS = 'TIMEDOUT' or a.STATUS = 'SUSPENDED') group by a.job_id having count(*) >= ?1 ) failedJobs LEFT OUTER JOIN (select b.job_id from coord_actions b where b.STATUS = 'SUCCEEDED' group by b.job_id having count(*) > 0 ) successJobs on failedJobs.job_id = successJobs.job_id where successJobs.job_id is null )") }) @Table(name = "COORD_JOBS") http://git-wip-us.apache.org/repos/asf/oozie/blob/1122898e/core/src/main/java/org/apache/oozie/action/email/EmailActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/email/EmailActionExecutor.java b/core/src/main/java/org/apache/oozie/action/email/EmailActionExecutor.java index d189ba8..21c6313 100644 --- a/core/src/main/java/org/apache/oozie/action/email/EmailActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/email/EmailActionExecutor.java @@ -125,10 +125,10 @@ public class EmailActionExecutor extends ActionExecutor { // All good - lets try to mail! - email(context, tos, ccs, subject, body, contentType); + email(tos, ccs, subject, body, contentType); } - protected void email(Context context, String[] to, String[] cc, String subject, String body, String contentType) + public void email(String[] to, String[] cc, String subject, String body, String contentType) throws ActionExecutorException { // Get mailing server details. String smtpHost = getOozieConf().get(EMAIL_SMTP_HOST, "localhost"); http://git-wip-us.apache.org/repos/asf/oozie/blob/1122898e/core/src/main/java/org/apache/oozie/command/wf/JobXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/JobXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/JobXCommand.java index 412019e..747d935 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/JobXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/JobXCommand.java @@ -84,7 +84,7 @@ public class JobXCommand extends WorkflowXCommand<WorkflowJobBean> { * @param jobId : Job ID to retrieve console URL * @return console URL */ - static String getJobConsoleUrl(String jobId) { + public static String getJobConsoleUrl(String jobId) { String consoleUrl = Services.get().getConf().get("oozie.JobCommand.job.console.url", null); return (consoleUrl != null) ? consoleUrl + jobId : null; } http://git-wip-us.apache.org/repos/asf/oozie/blob/1122898e/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java index 25953bf..2c9e00e 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java @@ -26,6 +26,7 @@ import java.util.List; import javax.persistence.EntityManager; import javax.persistence.Query; +import org.apache.oozie.CoordinatorActionBean; import org.apache.oozie.CoordinatorJobBean; import org.apache.oozie.ErrorCode; import org.apache.oozie.StringBlob; @@ -61,7 +62,8 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo GET_COORD_JOB_SUSPEND_KILL, GET_COORD_JOB_STATUS_PARENTID, GET_COORD_JOBS_CHANGED, - GET_COORD_JOBS_OLDER_FOR_MATERILZATION + GET_COORD_JOBS_OLDER_FOR_MATERILZATION, + GET_COORD_FOR_ABANDONEDCHECK }; private static CoordJobQueryExecutor instance = new CoordJobQueryExecutor(); @@ -205,6 +207,10 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo query.setMaxResults(limit); } break; + case GET_COORD_FOR_ABANDONEDCHECK: + query.setParameter(1, (Integer) parameters[0]); + query.setParameter(2, (Timestamp) parameters[1]); + break; default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " @@ -321,6 +327,15 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo bean = new CoordinatorJobBean(); bean.setId((String) ret); break; + case GET_COORD_FOR_ABANDONEDCHECK: + bean = new CoordinatorJobBean(); + arr = (Object[]) ret; + bean.setId((String) arr[0]); + bean.setUser((String) arr[1]); + bean.setGroup((String) arr[2]); + bean.setAppName((String) arr[3]); + break; + default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for " + namedQuery.name()); http://git-wip-us.apache.org/repos/asf/oozie/blob/1122898e/core/src/main/java/org/apache/oozie/service/AbandonedCoordCheckerService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/AbandonedCoordCheckerService.java b/core/src/main/java/org/apache/oozie/service/AbandonedCoordCheckerService.java new file mode 100644 index 0000000..b082567 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/service/AbandonedCoordCheckerService.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.service; + +import java.sql.Timestamp; +import java.util.Date; +import java.util.List; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.oozie.CoordinatorJobBean; +import org.apache.oozie.action.email.EmailActionExecutor; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.coord.CoordKillXCommand; +import org.apache.oozie.command.wf.JobXCommand; +import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; +import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; +import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.util.DateUtils; +import org.apache.oozie.util.XLog; + +import com.google.common.annotations.VisibleForTesting; + +/** + * The Abandoned Coord Checker Service check finds out the abandoned coord jobs in system and kills it. A job is + * considered to be abandoned/faulty if total number of actions in failed/timedout/suspended >= limit and there are no + * succeeded action and job start time < job.older.than. Email will not be sent if + * oozie.service.AbandonedCoordCheckerService.email.address is not configured. + */ +public class AbandonedCoordCheckerService implements Service { + + private static final String CONF_PREFIX = Service.CONF_PREFIX + "AbandonedCoordCheckerService."; + private static final String TO_ADDRESS = CONF_PREFIX + "email.address"; + private static final String CONTENT_TYPE = "text/html"; + private static final String SUBJECT = "Abandoned Coordinators report"; + private static final String CONF_CHECK_INTERVAL = CONF_PREFIX + "check.interval"; + private static final String CONF_CHECK_DELAY = CONF_PREFIX + "check.delay"; + private static final String CONF_FAILURE_LEN = CONF_PREFIX + "failure.limit"; + private static final String CONF_JOB_OLDER_THAN = CONF_PREFIX + "job.older.than"; + + private static final int DEFAULT_FAILURE_LEN = 20; + private static final int DEFAULT_CHECK_INTERVAL = 24 * 60; // Once a day + private static final int DEFAULT_CHECK_DELAY = 1 * 60; // One hour. + private static final int DEFAULT_CONF_JOB_OLDER_THAN = 2880; // One days + + private static final String CONF_JOB_KILL = CONF_PREFIX + "kill.jobs"; + private static final boolean DEFAULT_JOB_KILL = false; + public static final String OOZIE_BASE_URL = "oozie.base.url"; + private static String[] to; + private static String serverURL; + + public static class AbandonedCoordCheckerRunnable implements Runnable { + private StringBuilder msg; + final int failureLimit; + XLog LOG = XLog.getLog(getClass()); + private boolean shouldKill = DEFAULT_JOB_KILL; + + public AbandonedCoordCheckerRunnable(int failureLimit) { + this(failureLimit, false); + } + + public AbandonedCoordCheckerRunnable(int failureLimit, boolean shouldKill) { + this.failureLimit = failureLimit; + this.shouldKill = shouldKill; + } + + public void run() { + if (!Services.get().get(JobsConcurrencyService.class).isLeader()) { + LOG.info("Server is not primary server. Skipping run"); + return; + } + msg = new StringBuilder(); + XLog.Info.get().clear(); + msg.append("<!DOCTYPE html><html><head><style>table,th,td{border:1px solid black;border-collapse:collapse;}</style>" + + "</head><body><table>"); + addTableHeader(); + try { + checkCoordJobs(); + msg.append("</table></body></html>"); + sendMail(msg.toString()); + } + catch (Exception e) { + LOG.error("Error running AbandonedCoordChecker", e); + } + } + + /** + * Check coordinator + * + * @throws CommandException + */ + private void checkCoordJobs() throws CommandException { + + List<CoordinatorJobBean> jobs; + try { + Timestamp createdTS = new Timestamp( + System.currentTimeMillis() + - (Services.get().getConf() + .getInt(CONF_JOB_OLDER_THAN, DEFAULT_CONF_JOB_OLDER_THAN) * 60 * 1000)); + + jobs = CoordJobQueryExecutor.getInstance().getList(CoordJobQuery.GET_COORD_FOR_ABANDONEDCHECK, + failureLimit, createdTS); + + for (CoordinatorJobBean job : jobs) { + String killStatus = "Coord kill is disabled"; + LOG.info("Abandoned Coord found : " + job.getId()); + if (shouldKill) { + try { + new CoordKillXCommand(job.getId()).call(); + LOG.info("Killed abandoned coord : " + job.getId()); + killStatus = "Successful"; + } + catch (Exception e) { + LOG.error("Can't kill abandoned coord : " + job.getId(), e); + killStatus = " Failed : " + e.getMessage(); + } + } + addCoordToMessage(job, killStatus); + } + } + catch (JPAExecutorException je) { + throw new CommandException(je); + } + } + + public void addCoordToMessage(CoordinatorJobBean job, String killStatus) { + msg.append("<tr>"); + msg.append("<td><a href=\"").append(JobXCommand.getJobConsoleUrl(job.getId())).append("\">") + .append(job.getId()).append("</a></td>"); + msg.append("<td>").append(job.getAppName()).append("</td>"); + msg.append("<td>").append(job.getUser()).append("</td>"); + msg.append("<td>").append(job.getGroup()).append("</td>"); + msg.append("<td>").append(killStatus).append("</td>"); + msg.append("</tr>"); + } + + public void addTableHeader() { + msg.append("<tr>"); + msg.append("<td>").append("Coordinator id").append("</td>"); + msg.append("<td>").append("Coordinator name").append("</td>"); + msg.append("<td>").append("User name").append("</td>"); + msg.append("<td>").append("Group").append("</td>"); + msg.append("<td>").append("Kill Status").append("</td>"); + msg.append("</tr>"); + } + + @VisibleForTesting + public String getMessage() { + return msg.toString(); + } + + public void sendMail(String body) throws Exception { + if (to == null || to.length == 0 || (to.length == 1 && StringUtils.isEmpty(to[0]))) { + LOG.info(TO_ADDRESS + " is not configured. Not sending email"); + return; + } + EmailActionExecutor email = new EmailActionExecutor(); + String subject = SUBJECT + " for " + serverURL + " at " + DateUtils.formatDateOozieTZ(new Date()); + email.email(to, new String[0], subject, body, CONTENT_TYPE); + } + } + + @Override + public void init(Services services) { + Configuration conf = services.getConf(); + to = conf.getStrings(TO_ADDRESS); + int failureLen = conf.getInt(CONF_FAILURE_LEN, DEFAULT_FAILURE_LEN); + boolean shouldKill = conf.getBoolean(CONF_JOB_KILL, DEFAULT_JOB_KILL); + serverURL = conf.get(OOZIE_BASE_URL); + + int delay = conf.getInt(CONF_CHECK_DELAY, DEFAULT_CHECK_DELAY); + + Runnable actionCheckRunnable = new AbandonedCoordCheckerRunnable(failureLen, shouldKill); + services.get(SchedulerService.class).schedule(actionCheckRunnable, delay, + conf.getInt(CONF_CHECK_INTERVAL, DEFAULT_CHECK_INTERVAL), SchedulerService.Unit.MIN); + + } + + @Override + public void destroy() { + } + + @Override + public Class<? extends Service> getInterface() { + return AbandonedCoordCheckerService.class; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/1122898e/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index b931d52..6a91dc6 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -2203,4 +2203,40 @@ </description> </property> + <property> + <name>oozie.service.AbandonedCoordCheckerService.check.interval + </name> + <value>1440</value> + <description> + Interval, in minutes, at which AbandonedCoordCheckerService should run. + </description> + </property> + + <property> + <name>oozie.service.AbandonedCoordCheckerService.failure.limit + </name> + <value>25</value> + <description> + Failure limit. A job is considered to be abandoned/faulty if total number of actions in + failed/timedout/suspended >= "Failure limit" and there are no succeeded action. + </description> + </property> + + <property> + <name>oozie.service.AbandonedCoordCheckerService.kill.jobs + </name> + <value>false</value> + <description> + If true, AbandonedCoordCheckerService will kill abandoned coords. + </description> + </property> + + <property> + <name>oozie.service.AbandonedCoordCheckerService.job.older.than</name> + <value>2880</value> + <description> + In minutes, job will be considered as abandoned/faulty if job is older than this value. + </description> + </property> + </configuration> http://git-wip-us.apache.org/repos/asf/oozie/blob/1122898e/core/src/test/java/org/apache/oozie/command/coord/TestAbandonedCoordChecker.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestAbandonedCoordChecker.java b/core/src/test/java/org/apache/oozie/command/coord/TestAbandonedCoordChecker.java new file mode 100644 index 0000000..fa47dd6 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/command/coord/TestAbandonedCoordChecker.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command.coord; + +import java.util.Date; + +import org.apache.commons.lang.time.DateUtils; +import org.apache.oozie.CoordinatorJobBean; +import org.apache.oozie.client.CoordinatorAction; +import org.apache.oozie.client.CoordinatorJob; +import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; +import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; +import org.apache.oozie.service.Services; +import org.apache.oozie.test.XDataTestCase; +import org.apache.oozie.service.AbandonedCoordCheckerService.AbandonedCoordCheckerRunnable; + +public class TestAbandonedCoordChecker extends XDataTestCase { + private Services services; + + @Override + protected void setUp() throws Exception { + super.setUp(); + services = new Services(); + services.init(); + } + + @Override + protected void tearDown() throws Exception { + services.destroy(); + super.tearDown(); + } + + public void tesAbandonedFailed() throws Exception { + Date start = DateUtils.addMonths(new Date(), -1); + Date end = new Date(start.getTime() + (4 * 60 * 60 * 1000)); // 4 hrs + + final CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, + false, 5); + addRecordToCoordActionTable(job1.getId(), 5, CoordinatorAction.Status.FAILED); + + final CoordinatorJobBean job2 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, + false, 4); + addRecordToCoordActionTable(job2.getId(), 4, CoordinatorAction.Status.FAILED); + + AbandonedCoordCheckerRunnable coordChecked = new AbandonedCoordCheckerRunnable(5); + coordChecked.run(); + String msg = coordChecked.getMessage(); + assertTrue(msg.contains(job1.getId())); + assertFalse(msg.contains(job2.getId())); + + } + + public void testAbandoned_notAbandoned() throws Exception { + Date start = DateUtils.addMonths(new Date(), -1); + Date end = new Date(start.getTime() + (4 * 60 * 60 * 1000)); // 4 hrs + + final CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, + false, 6); + + addRecordToCoordActionTable(job1.getId(), 6, CoordinatorAction.Status.SUCCEEDED, + CoordinatorAction.Status.FAILED); + + final CoordinatorJobBean job2 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, start, end, true, + false, 6); + + addRecordToCoordActionTable(job2.getId(), 6, CoordinatorAction.Status.SUCCEEDED, + CoordinatorAction.Status.FAILED); + + AbandonedCoordCheckerRunnable coordChecked = new AbandonedCoordCheckerRunnable(5); + coordChecked.run(); + String msg = coordChecked.getMessage(); + assertFalse(msg.contains(job1.getId())); + assertFalse(msg.contains(job2.getId())); + } + + public void testMessage_withTimedout() throws Exception { + Date start = DateUtils.addMonths(new Date(), -1); + Date end = new Date(start.getTime() + (4 * 60 * 60 * 1000)); // 4 hrs + + final CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, + false, 12); + + addRecordToCoordActionTable(job1.getId(), 12, CoordinatorAction.Status.TIMEDOUT); + + final CoordinatorJobBean job2 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, + false, 4); + addRecordToCoordActionTable(job2.getId(), 4, CoordinatorAction.Status.TIMEDOUT); + + AbandonedCoordCheckerRunnable coordChecked = new AbandonedCoordCheckerRunnable(10); + coordChecked.run(); + String msg = coordChecked.getMessage(); + assertTrue(msg.contains(job1.getId())); + assertFalse(msg.contains(job2.getId())); + + } + + public void testMessage_withMixedStatus() throws Exception { + Date start = DateUtils.addMonths(new Date(), -1); + Date end = new Date(start.getTime() + (4 * 60 * 60 * 1000)); // 4 hrs + + final CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, + false, 5); + + addRecordToCoordActionTable(job1.getId(), 5, CoordinatorAction.Status.FAILED, + CoordinatorAction.Status.SUSPENDED, CoordinatorAction.Status.TIMEDOUT); + + final CoordinatorJobBean job2 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, + false, 5); + + addRecordToCoordActionTable(job2.getId(), 5, CoordinatorAction.Status.FAILED, + CoordinatorAction.Status.SUSPENDED, CoordinatorAction.Status.TIMEDOUT); + + final CoordinatorJobBean job3 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, start, end, true, + false, 5); + addRecordToCoordActionTable(job3.getId(), 5, CoordinatorAction.Status.FAILED, + CoordinatorAction.Status.SUSPENDED, CoordinatorAction.Status.TIMEDOUT); + + AbandonedCoordCheckerRunnable coordChecked = new AbandonedCoordCheckerRunnable(5); + coordChecked.run(); + String msg = coordChecked.getMessage(); + assertTrue(msg.contains(job1.getId())); + assertTrue(msg.contains(job2.getId())); + assertFalse(msg.contains(job3.getId())); + } + + public void testKill() throws Exception { + Date start = DateUtils.addMonths(new Date(), -1); + Date end = new Date(start.getTime() + (4 * 60 * 60 * 1000)); // 4 hrs + CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, false, 6); + addRecordToCoordActionTable(job1.getId(), 6, CoordinatorAction.Status.FAILED); + CoordinatorJobBean job2 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, false, 4); + addRecordToCoordActionTable(job2.getId(), 4, CoordinatorAction.Status.FAILED); + new AbandonedCoordCheckerRunnable(5, true).run(); + assertEquals(CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, job1.getId()).getStatus(), + CoordinatorJob.Status.KILLED); + assertEquals(CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, job2.getId()).getStatus(), + CoordinatorJob.Status.RUNNING); + } + + public void testStartTime() throws Exception { + Date start = new Date(); + Date end = new Date(start.getTime() + (4 * 60 * 60 * 1000)); // 4 hrs + CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, false, 6); + addRecordToCoordActionTable(job1.getId(), 6, CoordinatorAction.Status.FAILED); + CoordinatorJobBean job2 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, false, 4); + addRecordToCoordActionTable(job2.getId(), 10, CoordinatorAction.Status.FAILED); + new AbandonedCoordCheckerRunnable(5, true).run(); + + // Both job should be running as starttime > 2 days buffer + assertEquals(CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, job1.getId()).getStatus(), + CoordinatorJob.Status.RUNNING); + assertEquals(CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, job2.getId()).getStatus(), + CoordinatorJob.Status.RUNNING); + } + + private void addRecordToCoordActionTable(String jobId, int count, CoordinatorAction.Status... status) + throws Exception { + + for (int i = 1; i <= count; i++) { + CoordinatorAction.Status jobStatus = status[status.length - 1]; + if (i <= status.length) { + jobStatus = status[i - 1]; + } + addRecordToCoordActionTable(jobId, i, jobStatus, "coord-action-get.xml", 0); + } + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/1122898e/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index b3bbb2f..7ce842b 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.2.0 release (trunk - unreleased) +OOZIE-1813 Add service to report/kill rogue bundles and coordinator jobs (puru) OOZIE-1847 HA - Oozie servers should shutdown (or go in safe mode) in case of ZK failure (puru) OOZIE-1957 Coord update command override group when oozie.service.AuthorizationService.default.group.as.acl is set and group/acl is not configured in job property (puru) OOZIE-1818 CoordMaterializeTransitionXCommand verifyPrecondition doesn't verify current time (puru)
