Repository: oozie
Updated Branches:
  refs/heads/master e7c642c1c -> 1122898e5


OOZIE-1813 Add service to report/kill rogue bundles and coordinator jobs


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/1122898e
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/1122898e
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/1122898e

Branch: refs/heads/master
Commit: 1122898e51e3e09cd0399565d5cfc15c59fa9ead
Parents: e7c642c
Author: Purshotam Shah <[email protected]>
Authored: Wed Sep 10 12:04:52 2014 -0700
Committer: Purshotam Shah <[email protected]>
Committed: Wed Sep 10 12:04:52 2014 -0700

----------------------------------------------------------------------
 .../org/apache/oozie/CoordinatorJobBean.java    |   7 +-
 .../oozie/action/email/EmailActionExecutor.java |   4 +-
 .../apache/oozie/command/wf/JobXCommand.java    |   2 +-
 .../executor/jpa/CoordJobQueryExecutor.java     |  17 +-
 .../service/AbandonedCoordCheckerService.java   | 202 +++++++++++++++++++
 core/src/main/resources/oozie-default.xml       |  36 ++++
 .../coord/TestAbandonedCoordChecker.java        | 183 +++++++++++++++++
 release-log.txt                                 |   1 +
 8 files changed, 447 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/1122898e/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java 
b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
index 14fd74c..3dc7450 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
@@ -32,6 +32,8 @@ import javax.persistence.Column;
 import javax.persistence.Entity;
 import javax.persistence.Id;
 import javax.persistence.Lob;
+import javax.persistence.NamedNativeQueries;
+import javax.persistence.NamedNativeQuery;
 import javax.persistence.NamedQueries;
 import javax.persistence.NamedQuery;
 import javax.persistence.Table;
@@ -129,7 +131,10 @@ import org.json.simple.JSONObject;
 
         @NamedQuery(name = "GET_COORD_JOB_FOR_USER", query = "select w.user 
from CoordinatorJobBean w where w.id = :id"),
 
-        @NamedQuery(name = "GET_COORD_JOB_STATUS_PARENTID", query = "select 
w.statusStr, w.bundleId from CoordinatorJobBean w where w.id = :id")
+        @NamedQuery(name = "GET_COORD_JOB_STATUS_PARENTID", query = "select 
w.statusStr, w.bundleId from CoordinatorJobBean w where w.id = :id")})
+
+@NamedNativeQueries({
+        @NamedNativeQuery(name = "GET_COORD_FOR_ABANDONEDCHECK", query = 
"select w.id, w.USER_NAME, w.group_name, w.APP_NAME from coord_jobs w where ( 
w.STATUS = 'RUNNING' or w.STATUS = 'RUNNINGWITHERROR' ) and w.start_time < ?2 
and w.id in (select failedJobs.job_id from (select a.job_id from coord_actions 
a where ( a.STATUS = 'FAILED' or a.STATUS = 'TIMEDOUT'  or a.STATUS = 
'SUSPENDED') group by a.job_id having count(*) >= ?1 ) failedJobs LEFT OUTER 
JOIN (select b.job_id from coord_actions b where b.STATUS = 'SUCCEEDED' group 
by b.job_id having count(*) > 0 ) successJobs   on  failedJobs.job_id = 
successJobs.job_id where successJobs.job_id is null )")
 
 })
 @Table(name = "COORD_JOBS")

http://git-wip-us.apache.org/repos/asf/oozie/blob/1122898e/core/src/main/java/org/apache/oozie/action/email/EmailActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/email/EmailActionExecutor.java 
b/core/src/main/java/org/apache/oozie/action/email/EmailActionExecutor.java
index d189ba8..21c6313 100644
--- a/core/src/main/java/org/apache/oozie/action/email/EmailActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/email/EmailActionExecutor.java
@@ -125,10 +125,10 @@ public class EmailActionExecutor extends ActionExecutor {
 
 
         // All good - lets try to mail!
-        email(context, tos, ccs, subject, body, contentType);
+        email(tos, ccs, subject, body, contentType);
     }
 
-    protected void email(Context context, String[] to, String[] cc, String 
subject, String body, String contentType)
+    public void email(String[] to, String[] cc, String subject, String body, 
String contentType)
             throws ActionExecutorException {
         // Get mailing server details.
         String smtpHost = getOozieConf().get(EMAIL_SMTP_HOST, "localhost");

http://git-wip-us.apache.org/repos/asf/oozie/blob/1122898e/core/src/main/java/org/apache/oozie/command/wf/JobXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/JobXCommand.java 
b/core/src/main/java/org/apache/oozie/command/wf/JobXCommand.java
index 412019e..747d935 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/JobXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/JobXCommand.java
@@ -84,7 +84,7 @@ public class JobXCommand extends 
WorkflowXCommand<WorkflowJobBean> {
      * @param jobId : Job ID to retrieve console URL
      * @return console URL
      */
-    static String getJobConsoleUrl(String jobId) {
+    public static String getJobConsoleUrl(String jobId) {
         String consoleUrl = 
Services.get().getConf().get("oozie.JobCommand.job.console.url", null);
         return (consoleUrl != null) ? consoleUrl + jobId : null;
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/1122898e/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java 
b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
index 25953bf..2c9e00e 100644
--- 
a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
+++ 
b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
@@ -26,6 +26,7 @@ import java.util.List;
 import javax.persistence.EntityManager;
 import javax.persistence.Query;
 
+import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.StringBlob;
@@ -61,7 +62,8 @@ public class CoordJobQueryExecutor extends 
QueryExecutor<CoordinatorJobBean, Coo
         GET_COORD_JOB_SUSPEND_KILL,
         GET_COORD_JOB_STATUS_PARENTID,
         GET_COORD_JOBS_CHANGED,
-        GET_COORD_JOBS_OLDER_FOR_MATERILZATION
+        GET_COORD_JOBS_OLDER_FOR_MATERILZATION,
+        GET_COORD_FOR_ABANDONEDCHECK
     };
 
     private static CoordJobQueryExecutor instance = new 
CoordJobQueryExecutor();
@@ -205,6 +207,10 @@ public class CoordJobQueryExecutor extends 
QueryExecutor<CoordinatorJobBean, Coo
                     query.setMaxResults(limit);
                 }
                 break;
+            case GET_COORD_FOR_ABANDONEDCHECK:
+                query.setParameter(1, (Integer) parameters[0]);
+                query.setParameter(2, (Timestamp) parameters[1]);
+                break;
 
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor 
cannot set parameters for "
@@ -321,6 +327,15 @@ public class CoordJobQueryExecutor extends 
QueryExecutor<CoordinatorJobBean, Coo
                 bean = new CoordinatorJobBean();
                 bean.setId((String) ret);
                 break;
+            case GET_COORD_FOR_ABANDONEDCHECK:
+                bean = new CoordinatorJobBean();
+                arr = (Object[]) ret;
+                bean.setId((String) arr[0]);
+                bean.setUser((String) arr[1]);
+                bean.setGroup((String) arr[2]);
+                bean.setAppName((String) arr[3]);
+                break;
+
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor 
cannot construct job bean for "
                         + namedQuery.name());

http://git-wip-us.apache.org/repos/asf/oozie/blob/1122898e/core/src/main/java/org/apache/oozie/service/AbandonedCoordCheckerService.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/service/AbandonedCoordCheckerService.java 
b/core/src/main/java/org/apache/oozie/service/AbandonedCoordCheckerService.java
new file mode 100644
index 0000000..b082567
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/service/AbandonedCoordCheckerService.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.service;
+
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.action.email.EmailActionExecutor;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.coord.CoordKillXCommand;
+import org.apache.oozie.command.wf.JobXCommand;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.XLog;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * The Abandoned Coord Checker Service check finds out the abandoned coord 
jobs in system and kills it. A job is
+ * considered to be abandoned/faulty if total number of actions in 
failed/timedout/suspended >= limit and there are no
+ * succeeded action and job start time < job.older.than. Email will not be 
sent if
+ * oozie.service.AbandonedCoordCheckerService.email.address is not configured.
+ */
+public class AbandonedCoordCheckerService implements Service {
+
+    private static final String CONF_PREFIX = Service.CONF_PREFIX + 
"AbandonedCoordCheckerService.";
+    private static final String TO_ADDRESS = CONF_PREFIX + "email.address";
+    private static final String CONTENT_TYPE = "text/html";
+    private static final String SUBJECT = "Abandoned Coordinators report";
+    private static final String CONF_CHECK_INTERVAL = CONF_PREFIX + 
"check.interval";
+    private static final String CONF_CHECK_DELAY = CONF_PREFIX + "check.delay";
+    private static final String CONF_FAILURE_LEN = CONF_PREFIX + 
"failure.limit";
+    private static final String CONF_JOB_OLDER_THAN = CONF_PREFIX + 
"job.older.than";
+
+    private static final int DEFAULT_FAILURE_LEN = 20;
+    private static final int DEFAULT_CHECK_INTERVAL = 24 * 60; // Once a day
+    private static final int DEFAULT_CHECK_DELAY = 1 * 60; // One hour.
+    private static final int DEFAULT_CONF_JOB_OLDER_THAN = 2880; // One days
+
+    private static final String CONF_JOB_KILL = CONF_PREFIX + "kill.jobs";
+    private static final boolean DEFAULT_JOB_KILL = false;
+    public static final String OOZIE_BASE_URL = "oozie.base.url";
+    private static String[] to;
+    private static String serverURL;
+
+    public static class AbandonedCoordCheckerRunnable implements Runnable {
+        private  StringBuilder msg;
+        final int failureLimit;
+        XLog LOG = XLog.getLog(getClass());
+        private boolean shouldKill = DEFAULT_JOB_KILL;
+
+        public AbandonedCoordCheckerRunnable(int failureLimit) {
+            this(failureLimit, false);
+        }
+
+        public AbandonedCoordCheckerRunnable(int failureLimit, boolean 
shouldKill) {
+            this.failureLimit = failureLimit;
+            this.shouldKill = shouldKill;
+        }
+
+        public void run() {
+            if (!Services.get().get(JobsConcurrencyService.class).isLeader()) {
+                LOG.info("Server is not primary server. Skipping run");
+                return;
+            }
+            msg = new StringBuilder();
+            XLog.Info.get().clear();
+            msg.append("<!DOCTYPE 
html><html><head><style>table,th,td{border:1px solid 
black;border-collapse:collapse;}</style>"
+                    + "</head><body><table>");
+            addTableHeader();
+            try {
+                checkCoordJobs();
+                msg.append("</table></body></html>");
+                sendMail(msg.toString());
+            }
+            catch (Exception e) {
+                LOG.error("Error running AbandonedCoordChecker", e);
+            }
+        }
+
+        /**
+         * Check coordinator
+         *
+         * @throws CommandException
+         */
+        private void checkCoordJobs() throws CommandException {
+
+            List<CoordinatorJobBean> jobs;
+            try {
+                Timestamp createdTS = new Timestamp(
+                        System.currentTimeMillis()
+                                - (Services.get().getConf()
+                                        .getInt(CONF_JOB_OLDER_THAN, 
DEFAULT_CONF_JOB_OLDER_THAN) * 60 * 1000));
+
+                jobs = 
CoordJobQueryExecutor.getInstance().getList(CoordJobQuery.GET_COORD_FOR_ABANDONEDCHECK,
+                        failureLimit, createdTS);
+
+                for (CoordinatorJobBean job : jobs) {
+                    String killStatus = "Coord kill is disabled";
+                    LOG.info("Abandoned Coord found : " + job.getId());
+                    if (shouldKill) {
+                        try {
+                            new CoordKillXCommand(job.getId()).call();
+                            LOG.info("Killed abandoned coord :  " + 
job.getId());
+                            killStatus = "Successful";
+                        }
+                        catch (Exception e) {
+                            LOG.error("Can't kill abandoned coord :  " + 
job.getId(), e);
+                            killStatus = " Failed : " + e.getMessage();
+                        }
+                    }
+                    addCoordToMessage(job, killStatus);
+                }
+            }
+            catch (JPAExecutorException je) {
+                throw new CommandException(je);
+            }
+        }
+
+        public void addCoordToMessage(CoordinatorJobBean job, String 
killStatus) {
+            msg.append("<tr>");
+            msg.append("<td><a 
href=\"").append(JobXCommand.getJobConsoleUrl(job.getId())).append("\">")
+                    .append(job.getId()).append("</a></td>");
+            msg.append("<td>").append(job.getAppName()).append("</td>");
+            msg.append("<td>").append(job.getUser()).append("</td>");
+            msg.append("<td>").append(job.getGroup()).append("</td>");
+            msg.append("<td>").append(killStatus).append("</td>");
+            msg.append("</tr>");
+        }
+
+        public void addTableHeader() {
+            msg.append("<tr>");
+            msg.append("<td>").append("Coordinator id").append("</td>");
+            msg.append("<td>").append("Coordinator name").append("</td>");
+            msg.append("<td>").append("User name").append("</td>");
+            msg.append("<td>").append("Group").append("</td>");
+            msg.append("<td>").append("Kill Status").append("</td>");
+            msg.append("</tr>");
+        }
+
+        @VisibleForTesting
+        public String getMessage() {
+            return msg.toString();
+        }
+
+        public void sendMail(String body) throws Exception {
+            if (to == null || to.length == 0 || (to.length == 1 && 
StringUtils.isEmpty(to[0]))) {
+                LOG.info(TO_ADDRESS + " is not configured. Not sending email");
+                return;
+            }
+            EmailActionExecutor email = new EmailActionExecutor();
+            String subject = SUBJECT + " for " + serverURL + " at " + 
DateUtils.formatDateOozieTZ(new Date());
+            email.email(to, new String[0], subject, body, CONTENT_TYPE);
+        }
+    }
+
+    @Override
+    public void init(Services services) {
+        Configuration conf = services.getConf();
+        to = conf.getStrings(TO_ADDRESS);
+        int failureLen = conf.getInt(CONF_FAILURE_LEN, DEFAULT_FAILURE_LEN);
+        boolean shouldKill = conf.getBoolean(CONF_JOB_KILL, DEFAULT_JOB_KILL);
+        serverURL = conf.get(OOZIE_BASE_URL);
+
+        int delay = conf.getInt(CONF_CHECK_DELAY, DEFAULT_CHECK_DELAY);
+
+        Runnable actionCheckRunnable = new 
AbandonedCoordCheckerRunnable(failureLen, shouldKill);
+        services.get(SchedulerService.class).schedule(actionCheckRunnable, 
delay,
+                conf.getInt(CONF_CHECK_INTERVAL, DEFAULT_CHECK_INTERVAL), 
SchedulerService.Unit.MIN);
+
+    }
+
+    @Override
+    public void destroy() {
+    }
+
+    @Override
+    public Class<? extends Service> getInterface() {
+        return AbandonedCoordCheckerService.class;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/1122898e/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml 
b/core/src/main/resources/oozie-default.xml
index b931d52..6a91dc6 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -2203,4 +2203,40 @@
         </description>
     </property>
 
+    <property>
+        <name>oozie.service.AbandonedCoordCheckerService.check.interval
+        </name>
+        <value>1440</value>
+        <description>
+            Interval, in minutes, at which AbandonedCoordCheckerService should 
run.
+        </description>
+    </property>
+
+    <property>
+        <name>oozie.service.AbandonedCoordCheckerService.failure.limit
+        </name>
+        <value>25</value>
+        <description>
+            Failure limit. A job is considered to be abandoned/faulty if total 
number of actions in
+            failed/timedout/suspended >= "Failure limit" and there are no 
succeeded action.
+        </description>
+    </property>
+
+    <property>
+        <name>oozie.service.AbandonedCoordCheckerService.kill.jobs
+        </name>
+        <value>false</value>
+        <description>
+            If true, AbandonedCoordCheckerService will kill abandoned coords.
+        </description>
+    </property>
+
+    <property>
+        <name>oozie.service.AbandonedCoordCheckerService.job.older.than</name>
+        <value>2880</value>
+        <description>
+         In minutes, job will be considered as abandoned/faulty if job is 
older than this value.
+        </description>
+    </property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/oozie/blob/1122898e/core/src/test/java/org/apache/oozie/command/coord/TestAbandonedCoordChecker.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/command/coord/TestAbandonedCoordChecker.java
 
b/core/src/test/java/org/apache/oozie/command/coord/TestAbandonedCoordChecker.java
new file mode 100644
index 0000000..fa47dd6
--- /dev/null
+++ 
b/core/src/test/java/org/apache/oozie/command/coord/TestAbandonedCoordChecker.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.coord;
+
+import java.util.Date;
+
+import org.apache.commons.lang.time.DateUtils;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import 
org.apache.oozie.service.AbandonedCoordCheckerService.AbandonedCoordCheckerRunnable;
+
+public class TestAbandonedCoordChecker extends XDataTestCase {
+    private Services services;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        services.init();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void tesAbandonedFailed() throws Exception {
+        Date start = DateUtils.addMonths(new Date(), -1);
+        Date end = new Date(start.getTime() + (4 * 60 * 60 * 1000)); // 4 hrs
+
+        final CoordinatorJobBean job1 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true,
+                false, 5);
+        addRecordToCoordActionTable(job1.getId(), 5, 
CoordinatorAction.Status.FAILED);
+
+        final CoordinatorJobBean job2 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true,
+                false, 4);
+        addRecordToCoordActionTable(job2.getId(), 4, 
CoordinatorAction.Status.FAILED);
+
+        AbandonedCoordCheckerRunnable coordChecked = new 
AbandonedCoordCheckerRunnable(5);
+        coordChecked.run();
+        String msg = coordChecked.getMessage();
+        assertTrue(msg.contains(job1.getId()));
+        assertFalse(msg.contains(job2.getId()));
+
+    }
+
+    public void testAbandoned_notAbandoned() throws Exception {
+        Date start = DateUtils.addMonths(new Date(), -1);
+        Date end = new Date(start.getTime() + (4 * 60 * 60 * 1000)); // 4 hrs
+
+        final CoordinatorJobBean job1 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true,
+                false, 6);
+
+        addRecordToCoordActionTable(job1.getId(), 6, 
CoordinatorAction.Status.SUCCEEDED,
+                CoordinatorAction.Status.FAILED);
+
+        final CoordinatorJobBean job2 = 
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, start, end, true,
+                false, 6);
+
+        addRecordToCoordActionTable(job2.getId(), 6, 
CoordinatorAction.Status.SUCCEEDED,
+                CoordinatorAction.Status.FAILED);
+
+        AbandonedCoordCheckerRunnable coordChecked = new 
AbandonedCoordCheckerRunnable(5);
+        coordChecked.run();
+        String msg = coordChecked.getMessage();
+        assertFalse(msg.contains(job1.getId()));
+        assertFalse(msg.contains(job2.getId()));
+    }
+
+    public void testMessage_withTimedout() throws Exception {
+        Date start = DateUtils.addMonths(new Date(), -1);
+        Date end = new Date(start.getTime() + (4 * 60 * 60 * 1000)); // 4 hrs
+
+        final CoordinatorJobBean job1 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true,
+                false, 12);
+
+        addRecordToCoordActionTable(job1.getId(), 12, 
CoordinatorAction.Status.TIMEDOUT);
+
+        final CoordinatorJobBean job2 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true,
+                false, 4);
+        addRecordToCoordActionTable(job2.getId(), 4, 
CoordinatorAction.Status.TIMEDOUT);
+
+        AbandonedCoordCheckerRunnable coordChecked = new 
AbandonedCoordCheckerRunnable(10);
+        coordChecked.run();
+        String msg = coordChecked.getMessage();
+        assertTrue(msg.contains(job1.getId()));
+        assertFalse(msg.contains(job2.getId()));
+
+    }
+
+    public void testMessage_withMixedStatus() throws Exception {
+        Date start = DateUtils.addMonths(new Date(), -1);
+        Date end = new Date(start.getTime() + (4 * 60 * 60 * 1000)); // 4 hrs
+
+        final CoordinatorJobBean job1 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true,
+                false, 5);
+
+        addRecordToCoordActionTable(job1.getId(), 5, 
CoordinatorAction.Status.FAILED,
+                CoordinatorAction.Status.SUSPENDED, 
CoordinatorAction.Status.TIMEDOUT);
+
+        final CoordinatorJobBean job2 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true,
+                false, 5);
+
+        addRecordToCoordActionTable(job2.getId(), 5, 
CoordinatorAction.Status.FAILED,
+                CoordinatorAction.Status.SUSPENDED, 
CoordinatorAction.Status.TIMEDOUT);
+
+        final CoordinatorJobBean job3 = 
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, start, end, true,
+                false, 5);
+        addRecordToCoordActionTable(job3.getId(), 5, 
CoordinatorAction.Status.FAILED,
+                CoordinatorAction.Status.SUSPENDED, 
CoordinatorAction.Status.TIMEDOUT);
+
+        AbandonedCoordCheckerRunnable coordChecked = new 
AbandonedCoordCheckerRunnable(5);
+        coordChecked.run();
+        String msg = coordChecked.getMessage();
+        assertTrue(msg.contains(job1.getId()));
+        assertTrue(msg.contains(job2.getId()));
+        assertFalse(msg.contains(job3.getId()));
+    }
+
+    public void testKill() throws Exception {
+        Date start = DateUtils.addMonths(new Date(), -1);
+        Date end = new Date(start.getTime() + (4 * 60 * 60 * 1000)); // 4 hrs
+        CoordinatorJobBean job1 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, 
false, 6);
+        addRecordToCoordActionTable(job1.getId(), 6, 
CoordinatorAction.Status.FAILED);
+        CoordinatorJobBean job2 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, 
false, 4);
+        addRecordToCoordActionTable(job2.getId(), 4, 
CoordinatorAction.Status.FAILED);
+        new AbandonedCoordCheckerRunnable(5, true).run();
+        
assertEquals(CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB,
 job1.getId()).getStatus(),
+                CoordinatorJob.Status.KILLED);
+        
assertEquals(CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB,
 job2.getId()).getStatus(),
+                CoordinatorJob.Status.RUNNING);
+    }
+
+    public void testStartTime() throws Exception {
+        Date start = new Date();
+        Date end = new Date(start.getTime() + (4 * 60 * 60 * 1000)); // 4 hrs
+        CoordinatorJobBean job1 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, 
false, 6);
+        addRecordToCoordActionTable(job1.getId(), 6, 
CoordinatorAction.Status.FAILED);
+        CoordinatorJobBean job2 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, 
false, 4);
+        addRecordToCoordActionTable(job2.getId(), 10, 
CoordinatorAction.Status.FAILED);
+        new AbandonedCoordCheckerRunnable(5, true).run();
+
+        // Both job should be running as starttime > 2 days buffer
+        
assertEquals(CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB,
 job1.getId()).getStatus(),
+                CoordinatorJob.Status.RUNNING);
+        
assertEquals(CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB,
 job2.getId()).getStatus(),
+                CoordinatorJob.Status.RUNNING);
+    }
+
+    private void addRecordToCoordActionTable(String jobId, int count, 
CoordinatorAction.Status... status)
+            throws Exception {
+
+        for (int i = 1; i <= count; i++) {
+            CoordinatorAction.Status jobStatus = status[status.length - 1];
+            if (i <= status.length) {
+                jobStatus = status[i - 1];
+            }
+            addRecordToCoordActionTable(jobId, i, jobStatus, 
"coord-action-get.xml", 0);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/1122898e/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index b3bbb2f..7ce842b 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.2.0 release (trunk - unreleased)
 
+OOZIE-1813 Add service to report/kill rogue bundles and coordinator jobs (puru)
 OOZIE-1847 HA - Oozie servers should shutdown (or go in safe mode) in case of 
ZK failure (puru)
 OOZIE-1957 Coord update command override group when 
oozie.service.AuthorizationService.default.group.as.acl is set and group/acl is 
not configured in job property (puru)
 OOZIE-1818 CoordMaterializeTransitionXCommand verifyPrecondition doesn't 
verify current time (puru)

Reply via email to