Repository: falcon Updated Branches: refs/heads/master 943fc11d7 -> 49050c84c
FALCON-1795 Kill api not killing waiting/ready instances Author: sandeep <[email protected]> Reviewers: Ajay Yadava <[email protected]>, Pavan Kumar, Peeyush Bishnoi Closes #22 from sandeepSamudrala/master Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/49050c84 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/49050c84 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/49050c84 Branch: refs/heads/master Commit: 49050c84c5f6b3ff364d4b7ec795b9861f4d9973 Parents: 943fc11 Author: sandeep <[email protected]> Authored: Fri Feb 5 21:06:39 2016 +0530 Committer: Ajay Yadava <[email protected]> Committed: Fri Feb 5 21:06:39 2016 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../workflow/engine/OozieWorkflowEngine.java | 20 ++++++++++++++++++- .../client/LocalOozieClientCoordProxy.java | 21 ++++++++++++++++++++ .../oozie/client/LocalProxyOozieClient.java | 5 +++++ .../apache/falcon/unit/FalconUnitTestBase.java | 13 ++++++++++++ .../org/apache/falcon/unit/TestFalconUnit.java | 16 +++++++++++++++ 6 files changed, 76 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/49050c84/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3dc3aa3..01e16b5 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -30,6 +30,8 @@ Trunk FALCON-1770 Update README file (Ajay Yadava) BUG FIXES + FALCON-1795 Kill api not killing waiting/ready instances + FALCON-1804 Non-SLA feed throws NullPointerException. FALCON-1806 Update documentation for Import and Export. (Venkatesan Ramachandran via Balu Vellanki) http://git-wip-us.apache.org/repos/asf/falcon/blob/49050c84/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index 04f5e93..ebf23da 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -901,9 +901,18 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { switch (action) { case KILL: - if (jobInfo == null || !WF_KILL_PRECOND.contains(jobInfo.getStatus())) { + if (jobInfo == null) { + StringBuilder scope = new StringBuilder(); + scope.append(SchemaHelper.formatDateUTC(coordinatorAction.getNominalTime())).append("::") + .append(SchemaHelper.formatDateUTC(coordinatorAction.getNominalTime())); + kill(cluster, coordinatorAction.getJobId(), "date", scope.toString()); + status = Status.KILLED.name(); break; } + if (!WF_KILL_PRECOND.contains(jobInfo.getStatus())) { + break; + } + kill(cluster, jobInfo.getId()); status = Status.KILLED.name(); @@ -1628,6 +1637,15 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } } + private void kill(String cluster, String jobId, String rangeType, String scope) throws FalconException { + try { + OozieClientFactory.get(cluster).kill(jobId, rangeType, scope); + LOG.info("Killed job {} for instances {} on cluster {}", jobId, scope, cluster); + } catch (OozieClientException e) { + throw new FalconException(e); + } + } + private void kill(String cluster, String jobId) throws FalconException { try { OozieClientFactory.get(cluster).kill(jobId); http://git-wip-us.apache.org/repos/asf/falcon/blob/49050c84/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientCoordProxy.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientCoordProxy.java b/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientCoordProxy.java index ff4561b..d0bfbfc 100644 --- a/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientCoordProxy.java +++ b/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientCoordProxy.java @@ -23,6 +23,8 @@ import org.apache.oozie.CoordinatorEngine; import org.apache.oozie.CoordinatorEngineException; import org.apache.oozie.LocalOozieClientCoord; +import java.util.List; + /** * Client API to submit and manage Oozie Coord jobs against an Oozie * intance. @@ -75,4 +77,23 @@ public class LocalOozieClientCoordProxy extends LocalOozieClientCoord { throw new OozieClientException(e.getErrorCode().toString(), e); } } + + /** + * Kill coordinator actions. + * + * @param jobId coordinator Job Id + * @param rangeType type 'date' if -date is used, 'action-num' if -action is used + * @param scope kill scope for date or action nums + * @return list of coordinator actions that underwent kill + * @throws OozieClientException thrown if some actions could not be killed. + */ + @Override + public List<CoordinatorAction> kill(String jobId, String rangeType, String scope) throws OozieClientException { + try { + coordEngine.killActions(jobId, rangeType, scope).getCoordActions(); + } catch (CoordinatorEngineException e) { + throw new OozieClientException(e.getErrorCode().toString(), e); + } + return null; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/49050c84/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java index 81f4c54..7bf5c37 100644 --- a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java +++ b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java @@ -193,6 +193,11 @@ public class LocalProxyOozieClient extends OozieClient { } @Override + public List<CoordinatorAction> kill(String jobId, String rangeType, String scope) throws OozieClientException { + return getClient(jobId).kill(jobId, rangeType, scope); + } + + @Override public void change(final String jobId, final String changeValue) throws OozieClientException { getClient(jobId).change(jobId, changeValue); } http://git-wip-us.apache.org/repos/asf/falcon/blob/49050c84/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java index fb30a55..70e1de9 100644 --- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java +++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java @@ -285,6 +285,19 @@ public class FalconUnitTestBase { fs.copyFromLocalFile(new Path(getAbsolutePath(inputFile)), new Path(feedPath)); } + public void deleteData(String feedName, String cluster) throws FalconException, ParseException, IOException { + Entity existingEntity = configStore.get(EntityType.FEED, feedName); + if (existingEntity == null) { + throw new FalconException("Feed Not Found " + feedName); + } + Feed feed = (Feed) existingEntity; + Storage rawStorage = FeedHelper.createStorage(cluster, feed); + String feedPathTemplate = rawStorage.getUriTemplate(LocationType.DATA); + + Path feedBasePath = FeedHelper.getFeedBasePath(feedPathTemplate); + fs.delete(feedBasePath, true); + } + protected String getFeedPathForTS(String cluster, String feedName, String timeStamp) throws FalconException, ParseException { Entity existingEntity = configStore.get(EntityType.FEED, feedName); http://git-wip-us.apache.org/repos/asf/falcon/blob/49050c84/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java index 7c76660..aaf2b37 100644 --- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java +++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java @@ -231,6 +231,22 @@ public class TestFalconUnit extends FalconUnitTestBase { } @Test + public void testKillWaitingInstances() throws Exception { + submitClusterAndFeeds(); + InstancesResult.WorkflowStatus currentStatus; + deleteData(INPUT_FEED_NAME, CLUSTER_NAME); + submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH); + scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 1, CLUSTER_NAME, getAbsolutePath(WORKFLOW), true, ""); + + waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.WAITING); + getClient().killInstances(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, END_TIME, null, + CLUSTER_NAME, null, null, null); + waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.KILLED); + currentStatus = getClient().getInstanceStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME); + Assert.assertEquals(currentStatus, InstancesResult.WorkflowStatus.KILLED); + } + + @Test public void testProcessInstanceManagementAPI1() throws Exception { submitClusterAndFeeds(); // submitting and scheduling process
