Repository: falcon Updated Branches: refs/heads/0.8 660711f83 -> 5cab56cf8
FALCON-1519 Suspend And Resume API's in Falcon Unit(Narayan Periwal) Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/5cab56cf Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/5cab56cf Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/5cab56cf Branch: refs/heads/0.8 Commit: 5cab56cf82576a3fc28bbeb41f4f799519ce8053 Parents: 660711f Author: Pallavi Rao <[email protected]> Authored: Wed Oct 14 12:39:25 2015 +0530 Committer: Pallavi Rao <[email protected]> Committed: Wed Oct 14 12:39:25 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../falcon/client/AbstractFalconClient.java | 36 ++++++++++++++ .../oozie/client/LocalProxyOozieClient.java | 20 +++++++- .../apache/falcon/unit/FalconUnitClient.java | 18 +++++++ .../unit/LocalSchedulableEntityManager.java | 16 +++++++ .../apache/falcon/unit/FalconUnitTestBase.java | 11 +++-- .../org/apache/falcon/unit/TestFalconUnit.java | 30 ++++++++++++ unit/src/test/resources/process1.xml | 50 ++++++++++++++++++++ 8 files changed, 178 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/5cab56cf/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2dc7e3c..e92c81e 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -21,6 +21,8 @@ Trunk (Unreleased) FALCON-1027 Falcon proxy user support(Sowmya Ramesh) IMPROVEMENTS + FALCON-1519 Suspend And Resume API's in Falcon Unit(Narayan Periwal via Pallavi Rao) + FALCON-1524 Improve Lifecycle Retention validation checks(Ajay Yadava) FALCON-1516 Feed Retention support in Falcon Unit(Pavan Kolamuri via Pallavi Rao) http://git-wip-us.apache.org/repos/asf/falcon/blob/5cab56cf/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java index 2358289..b889931 100644 --- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java @@ -81,4 +81,40 @@ public abstract class AbstractFalconClient { Integer offset, Integer numResults, String doAsUser) throws FalconCLIException; //RESUME CHECKSTYLE CHECK ParameterNumberCheck + + /** + * Suspend an entity. + * @param entityType Valid options are feed or process. + * @param entityName Name of the entity. + * @param colo Colo on which the query should be run. + * @param doAsUser proxy user + * @return Status of the entity. + * @throws FalconCLIException + */ + public abstract APIResult suspend(EntityType entityType, String entityName, String colo, String doAsUser) throws + FalconCLIException; + + /** + * Resume a supended entity. + * @param entityType Valid options are feed or process. + * @param entityName Name of the entity. + * @param colo Colo on which the query should be run. + * @param doAsUser proxy user + * @return Result of the resume command. + * @throws FalconCLIException + */ + public abstract APIResult resume(EntityType entityType, String entityName, String colo, String doAsUser) throws + FalconCLIException; + + /** + * Get status of the entity. + * @param entityType Valid options are feed or process. + * @param entityName Name of the entity. + * @param colo Colo on which the query should be run. + * @param doAsUser proxy user + * @return Status of the entity. + * @throws FalconCLIException + */ + public abstract APIResult getStatus(EntityType entityType, String entityName, String colo, String doAsUser) throws + FalconCLIException; } http://git-wip-us.apache.org/repos/asf/falcon/blob/5cab56cf/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java index 217cec9..756828f 100644 --- a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java +++ b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java @@ -61,6 +61,22 @@ public class LocalProxyOozieClient extends OozieClient { return localOozieClientCoord; } + private OozieClient getClient(String jobId) { + if (jobId != null) { + if (jobId.toUpperCase().endsWith("B")) { //checking if it's a bundle job + return getLocalOozieClientBundle(); + } else if (jobId.toUpperCase().endsWith("C")) { //checking if it's a coordinator job + return getLocalOozieClientCoord(); + } else if (jobId.toUpperCase().endsWith("W")) { //checking if it's a workflow job + return getLocalOozieClient(); + } else { + throw new IllegalArgumentException("Couldn't decide the type for the job-id " + jobId); + } + } else { + throw new IllegalArgumentException("Job-id cannot be null"); + } + } + @Override public BundleJob getBundleJobInfo(String jobId) throws OozieClientException { return getLocalOozieClientBundle().getBundleJobInfo(jobId); @@ -155,12 +171,12 @@ public class LocalProxyOozieClient extends OozieClient { @Override public void suspend(String jobId) throws OozieClientException { - throw new IllegalStateException("Suspend not supported "); + getClient(jobId).suspend(jobId); } @Override public void resume(String jobId) throws OozieClientException { - throw new IllegalStateException("Resume not supported "); + getClient(jobId).resume(jobId); } @Override http://git-wip-us.apache.org/repos/asf/falcon/blob/5cab56cf/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java index 3ce261e..783af19 100644 --- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java @@ -198,6 +198,24 @@ public class FalconUnitClient extends AbstractFalconClient { return null; } + @Override + public APIResult suspend(EntityType entityType, String entityName, String colo, String doAsUser) throws + FalconCLIException { + return localSchedulableEntityManager.suspend(entityType.name(), entityName, colo); + } + + @Override + public APIResult resume(EntityType entityType, String entityName, String colo, String doAsUser) throws + FalconCLIException { + return localSchedulableEntityManager.resume(entityType.name(), entityName, colo); + } + + @Override + public APIResult getStatus(EntityType entityType, String entityName, String colo, String doAsUser) throws + FalconCLIException { + return localSchedulableEntityManager.getStatus(entityType.name(), entityName, colo); + } + private boolean checkAndUpdateCluster(Entity entity, EntityType entityType, String cluster) { if (entityType == EntityType.FEED) { return checkAndUpdateFeedClusters(entity, cluster); http://git-wip-us.apache.org/repos/asf/falcon/blob/5cab56cf/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java index d793cf2..8b1c435 100644 --- a/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java +++ b/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java @@ -17,6 +17,7 @@ */ package org.apache.falcon.unit; +import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.AbstractSchedulableEntityManager; /** @@ -24,4 +25,19 @@ import org.apache.falcon.resource.AbstractSchedulableEntityManager; */ public class LocalSchedulableEntityManager extends AbstractSchedulableEntityManager { // Created for future purposes to add all entity API's here for falcon unit. + + public LocalSchedulableEntityManager() {} + + public APIResult suspend(String type, String entity, String colo) { + return super.suspend(null, type, entity, colo); + } + + public APIResult resume(String type, String entity, String colo) { + return super.resume(null, type, entity, colo); + } + + public APIResult getStatus(String type, String entity, String colo) { + return super.getStatus(type, entity, colo); + } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/5cab56cf/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java index 45b88f0..d12efbc 100644 --- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java +++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java @@ -43,7 +43,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.AfterClass; -import org.testng.annotations.AfterTest; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import java.io.File; @@ -120,8 +120,13 @@ public class FalconUnitTestBase { FalconUnit.cleanup(); } - @AfterTest - public void cleanUpActionXml() throws IOException { + @AfterMethod + public void cleanUpActionXml() throws IOException, FalconException { + for (EntityType type : EntityType.values()) { + for (String name : ConfigurationStore.get().getEntities(type)) { + ConfigurationStore.get().remove(type, name); + } + } //Needed since oozie writes action xml to current directory. FileUtils.deleteQuietly(new File("action.xml")); FileUtils.deleteQuietly(new File(".action.xml.crc")); http://git-wip-us.apache.org/repos/asf/falcon/blob/5cab56cf/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java index d2e574b..d504bd2 100644 --- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java +++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java @@ -87,4 +87,34 @@ public class TestFalconUnit extends FalconUnitTestBase { Assert.assertEquals(InstancesResult.WorkflowStatus.SUCCEEDED, status); Assert.assertFalse(fs.exists(new Path(inPath))); } + + @Test + public void testSuspendAndResume() throws Exception { + // submit with default props + submitCluster(); + // submitting feeds + APIResult result = submit(EntityType.FEED, getAbsolutePath("/infeed.xml")); + assertStatus(result); + result = submit(EntityType.FEED, getAbsolutePath("/outfeed.xml")); + assertStatus(result); + // submitting and scheduling process + String scheduleTime = "2015-06-20T00:00Z"; + createData("in", "local", scheduleTime, "input.txt"); + result = submitProcess(getAbsolutePath("/process1.xml"), "/app/oozie-mr"); + assertStatus(result); + result = scheduleProcess("process1", scheduleTime, 2, "local", getAbsolutePath("/workflow.xml"), + true, ""); + assertStatus(result); + waitForStatus(EntityType.PROCESS, "process1", scheduleTime); + result = getClient().suspend(EntityType.PROCESS, "process1", "local", null); + assertStatus(result); + result = getClient().getStatus(EntityType.PROCESS, "process1", "local", null); + assertStatus(result); + Assert.assertEquals(result.getMessage(), "SUSPENDED"); + result = getClient().resume(EntityType.PROCESS, "process1", "local", null); + assertStatus(result); + result = getClient().getStatus(EntityType.PROCESS, "process1", "local", null); + assertStatus(result); + Assert.assertEquals(result.getMessage(), "RUNNING"); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/5cab56cf/unit/src/test/resources/process1.xml ---------------------------------------------------------------------- diff --git a/unit/src/test/resources/process1.xml b/unit/src/test/resources/process1.xml new file mode 100644 index 0000000..37dbb9c --- /dev/null +++ b/unit/src/test/resources/process1.xml @@ -0,0 +1,50 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<process name="process1" xmlns="uri:falcon:process:0.1"> + <clusters> + <cluster name="local"> + <validity start="2013-11-18T00:05Z" end="2013-11-18T01:05Z"/> + </cluster> + </clusters> + + <parallel>5</parallel> + <order>FIFO</order> + <frequency>minutes(1)</frequency> + <timezone>UTC</timezone> + + <inputs> + <!-- In the workflow, the input paths will be available in a variable 'inpaths' --> + <input name="inpaths" feed="in" start="now(0,0)" end="now(0,0)" /> + </inputs> + + <outputs> + <!-- In the workflow, the output path will be available in a variable 'outpath' --> + <output name="outpath" feed="out" instance="now(0,0)"/> + </outputs> + + <properties> + <!-- In the workflow, these properties will be available with variable - key --> + <property name="queueName" value="default"/> + <!-- The schedule time available as a property in workflow --> + <property name="time" value="${instanceTime()}"/> + </properties> + + <workflow engine="oozie" path="/app/oozie-mr"/> +</process>
