Repository: falcon Updated Branches: refs/heads/master fd10aa410 -> 280ea92a7
FALCON-949 Force update feature. Contributed by pavan kumar kolamuri Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/280ea92a Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/280ea92a Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/280ea92a Branch: refs/heads/master Commit: 280ea92a7395b07f45d31cd64a1b892de43505e9 Parents: fd10aa4 Author: Suhas Vasu <[email protected]> Authored: Mon Feb 16 14:29:43 2015 +0530 Committer: Suhas Vasu <[email protected]> Committed: Mon Feb 16 14:29:43 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../java/org/apache/falcon/cli/FalconCLI.java | 10 +++- .../org/apache/falcon/client/FalconClient.java | 17 +++++- .../workflow/engine/AbstractWorkflowEngine.java | 2 + docs/src/site/twiki/FalconCLI.twiki | 6 +++ docs/src/site/twiki/restapi/EntityTouch.twiki | 29 +++++++++++ .../workflow/engine/OozieWorkflowEngine.java | 15 ++++++ .../falcon/resource/AbstractEntityManager.java | 7 ++- .../AbstractSchedulableEntityManager.java | 27 ++++++++++ .../proxy/SchedulableEntityManagerProxy.java | 23 +++++++++ .../resource/SchedulableEntityManager.java | 11 ++++ .../java/org/apache/falcon/cli/FalconCLIIT.java | 3 ++ .../falcon/resource/EntityManagerJerseyIT.java | 54 ++++++++++++++++++++ 13 files changed, 202 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/280ea92a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 349f4d4..8d16073 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -5,6 +5,7 @@ Trunk (Unreleased) INCOMPATIBLE CHANGES NEW FEATURES + FALCON-949 Force update feature (pavan kumar kolamuri via Suhas Vasu) IMPROVEMENTS FALCON-263 Adding documentation for params api (Ajay Yadav via Srikanth http://git-wip-us.apache.org/repos/asf/falcon/blob/280ea92a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java index 3620c3b..ac76a9c 100644 --- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java +++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java @@ -84,6 +84,7 @@ public class FalconCLI { public static final String DEFINITION_OPT = "definition"; public static final String DEPENDENCY_OPT = "dependency"; public static final String LIST_OPT = "list"; + public static final String TOUCH_OPT = "touch"; public static final String FIELDS_OPT = "fields"; public static final String FILTER_BY_OPT = "filterBy"; @@ -453,6 +454,10 @@ public class FalconCLI { entityType, cluster, start, end, fields, filterBy, filterTags, orderBy, sortOrder, offset, numResults, numInstances)); + } else if (optionsList.contains(TOUCH_OPT)) { + validateNotEmpty(entityName, ENTITY_NAME_OPT); + colo = getColo(colo); + result = client.touch(entityType, entityName, colo).getMessage(); } else if (optionsList.contains(HELP_CMD)) { OUT.get().println("Falcon Help"); } else { @@ -613,9 +618,11 @@ public class FalconCLI { Option dependency = new Option(DEPENDENCY_OPT, false, "Gets the dependencies of entity"); Option list = new Option(LIST_OPT, false, - "List entities registerd for a type"); + "List entities registered for a type"); Option entitySummary = new Option(SUMMARY_OPT, false, "Get summary of instances for list of entities"); + Option touch = new Option(TOUCH_OPT, false, + "Force update the entity in workflow engine(even without any changes to entity)"); OptionGroup group = new OptionGroup(); group.addOption(submit); @@ -631,6 +638,7 @@ public class FalconCLI { group.addOption(dependency); group.addOption(list); group.addOption(entitySummary); + group.addOption(touch); Option url = new Option(URL_OPTION, true, "Falcon URL"); Option entityType = new Option(ENTITY_TYPE_OPT, true, http://git-wip-us.apache.org/repos/asf/falcon/blob/280ea92a/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index 5064e46..86397c4 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -190,7 +190,8 @@ public class FalconClient { DEFINITION("api/entities/definition/", HttpMethod.GET, MediaType.TEXT_XML), LIST("api/entities/list/", HttpMethod.GET, MediaType.TEXT_XML), SUMMARY("api/entities/summary", HttpMethod.GET, MediaType.APPLICATION_JSON), - DEPENDENCY("api/entities/dependencies/", HttpMethod.GET, MediaType.TEXT_XML); + DEPENDENCY("api/entities/dependencies/", HttpMethod.GET, MediaType.TEXT_XML), + TOUCH("api/entities/touch", HttpMethod.POST, MediaType.TEXT_XML); private String path; private String method; @@ -381,6 +382,20 @@ public class FalconClient { orderBy, sortOrder, offset, numResults, numInstances); } + public APIResult touch(String entityType, String entityName, String colo) throws FalconCLIException { + Entities operation = Entities.TOUCH; + WebResource resource = service.path(operation.path).path(entityType).path(entityName); + if (colo != null) { + resource = resource.queryParam("colo", colo); + } + ClientResponse clientResponse = resource + .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) + .accept(operation.mimeType).type(MediaType.TEXT_XML) + .method(operation.method, ClientResponse.class); + checkIfSuccessful(clientResponse); + return parseAPIResult(clientResponse); + } + public InstancesResult getRunningInstances(String type, String entity, String colo, List<LifeCycle> lifeCycles, String filterBy, String orderBy, String sortOrder, Integer offset, Integer numResults) throws FalconCLIException { http://git-wip-us.apache.org/repos/asf/falcon/blob/280ea92a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java index f5b142b..6b10679 100644 --- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java +++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java @@ -90,6 +90,8 @@ public abstract class AbstractWorkflowEngine { public abstract String update(Entity oldEntity, Entity newEntity, String cluster) throws FalconException; + public abstract String touch(Entity entity, String cluster) throws FalconException; + public abstract String getWorkflowStatus(String cluster, String jobId) throws FalconException; public abstract Properties getWorkflowProperties(String cluster, String jobId) throws FalconException; http://git-wip-us.apache.org/repos/asf/falcon/blob/280ea92a/docs/src/site/twiki/FalconCLI.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki index d37cf8c..547aa7d 100644 --- a/docs/src/site/twiki/FalconCLI.twiki +++ b/docs/src/site/twiki/FalconCLI.twiki @@ -83,6 +83,12 @@ $FALCON_HOME/bin/falcon entity -type [feed|process] -name <<name>> -update -fil Example: $FALCON_HOME/bin/falcon entity -type process -name HourlyReportsGenerator -update -file /process/definition.xml +---+++Touch + +Force Update operation allows an already submitted/scheduled entity to be updated. + +Usage: +$FALCON_HOME/bin/falcon entity -type [feed|process] -name <<name>> -touch ---+++Status http://git-wip-us.apache.org/repos/asf/falcon/blob/280ea92a/docs/src/site/twiki/restapi/EntityTouch.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/restapi/EntityTouch.twiki b/docs/src/site/twiki/restapi/EntityTouch.twiki new file mode 100644 index 0000000..ffc7967 --- /dev/null +++ b/docs/src/site/twiki/restapi/EntityTouch.twiki @@ -0,0 +1,29 @@ +---++ POST api/entities/touch/:entity-type/:entity-name + * <a href="#Description">Description</a> + * <a href="#Parameters">Parameters</a> + * <a href="#Results">Results</a> + * <a href="#Examples">Examples</a> + +---++ Description +Force updates the entity. + +---++ Parameters + * :entity-type can be feed or process. + * :entity-name is name of the feed or process. + +---++ Results +Result of the validation. + +---++ Examples +---+++ Rest Call +<verbatim> +POST http://localhost:15000/api/entities/touch/process/SampleProcess +</verbatim> +---+++ Result +<verbatim> +{ + "requestId": "touch\/default\/d6aaa328-6836-4818-a212-515bb43d8b86\n\n", + "message": "touch\/default\/SampleProcess updated successfully\n\n", + "status": "SUCCEEDED" +} +</verbatim> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/280ea92a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index ce292bd..fd1cdac 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -1069,6 +1069,21 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { return result.toString(); } + @Override + public String touch(Entity entity, String cluster) throws FalconException { + BundleJob bundle = findLatestBundle(entity, cluster); + Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster); + StringBuilder result = new StringBuilder(); + if (bundle != MISSING) { + LOG.info("Updating entity {} for cluster: {}, bundle: {}", entity.toShortString(), cluster, bundle.getId()); + String output = updateInternal(entity, entity, clusterEntity, bundle, CurrentUser.getUser()); + result.append(output).append("\n"); + LOG.info("Entity update complete: {} for cluster {}, bundle: {}", entity.toShortString(), cluster, + bundle.getId()); + } + return result.toString(); + } + private String getUpdateString(Entity entity, Date date, BundleJob oldBundle, BundleJob newBundle) { StringBuilder builder = new StringBuilder(); builder.append(entity.toShortString()).append("/Effective Time: ").append(SchemaHelper.formatDateUTC(date)); http://git-wip-us.apache.org/repos/asf/falcon/blob/280ea92a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java index 0d34ef3..caa9a74 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java @@ -121,10 +121,15 @@ public abstract class AbstractEntityManager { protected Set<String> getColosFromExpression(String coloExpr, String type, String entity) { Set<String> colos; + final Set<String> applicableColos = getApplicableColos(type, entity); if (coloExpr == null || coloExpr.equals("*") || coloExpr.isEmpty()) { - colos = getApplicableColos(type, entity); + colos = applicableColos; } else { colos = new HashSet<String>(Arrays.asList(coloExpr.split(","))); + if (!applicableColos.containsAll(colos)) { + throw FalconWebException.newException("Given colos not applicable for entity operation", + Response.Status.BAD_REQUEST); + } } return colos; } http://git-wip-us.apache.org/repos/asf/falcon/blob/280ea92a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java index b8d12ee..adfef35 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java @@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.PathParam; +import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.Response; import java.util.*; @@ -224,6 +225,32 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM entitySummaries.toArray(new EntitySummaryResult.EntitySummary[entitySummaries.size()])); } + /** + * Force updates an entity. + * + * @param type + * @param entityName + * @return APIResult + */ + public APIResult touch(@Dimension("entityType") @PathParam("type") String type, + @Dimension("entityName") @PathParam("entity") String entityName, + @Dimension("colo") @QueryParam("colo") String colo) { + checkColo(colo); + StringBuilder result = new StringBuilder(); + try { + Entity entity = EntityUtil.getEntity(type, entityName); + Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity); + for (String cluster : clusters) { + result.append(getWorkflowEngine().touch(entity, cluster)); + } + } catch (Throwable e) { + LOG.error("Touch failed", e); + throw FalconWebException.newException(e, Response.Status.BAD_REQUEST); + } + return new APIResult(APIResult.Status.SUCCEEDED, result.toString()); + } + + private void validateTypeForEntitySummary(String type) { EntityType entityType = EntityType.getEnum(type); if (!entityType.isSchedulable()) { http://git-wip-us.apache.org/repos/asf/falcon/blob/280ea92a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java index 34ff0f7..5f711ee 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java @@ -274,6 +274,29 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana return consolidateResult(results, APIResult.class); } + @POST + @Path("touch/{type}/{entity}") + @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) + @Monitored(event = "touch") + @Override + public APIResult touch( + @Dimension("entityType") @PathParam("type") final String type, + @Dimension("entityName") @PathParam("entity") final String entityName, + @Dimension("colo") @QueryParam("colo") final String coloExpr) { + final Set<String> colosFromExp = getColosFromExpression(coloExpr, type, entityName); + return new EntityProxy(type, entityName) { + @Override + protected Set<String> getColosToApply() { + return colosFromExp; + } + + @Override + protected APIResult doExecute(String colo) throws FalconException { + return getEntityManager(colo).invoke("touch", type, entityName, colo); + } + }.execute(); + } + @GET @Path("status/{type}/{entity}") @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) http://git-wip-us.apache.org/repos/asf/falcon/blob/280ea92a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java index 2ec7f66..a83f0cf 100644 --- a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java +++ b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java @@ -151,4 +151,15 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager { return super.validate(request, type); } + @POST + @Path("touch/{type}/{entity}") + @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) + @Monitored(event = "touch") + @Override + public APIResult touch(@Dimension("entityType") @PathParam("type") String type, + @Dimension("entityName") @PathParam("entity") String entityName, + @Dimension("colo") @QueryParam("colo") String colo) { + return super.touch(type, entityName, colo); + } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/280ea92a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java index 118003f..7512302 100644 --- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java +++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java @@ -119,6 +119,9 @@ public class FalconCLIIT { Assert.assertEquals(executeWithURL("entity -update -name " + overlay.get("processName") + " -type process -file " + filePath), 0); + + Assert.assertEquals(0, + executeWithURL("entity -touch -name " + overlay.get("processName") + " -type process")); } public void testValidateValidCommands() throws Exception { http://git-wip-us.apache.org/repos/asf/falcon/blob/280ea92a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java index 40f8e04..c6fd420 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java +++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java @@ -140,6 +140,16 @@ public class EntityManagerJerseyIT { .post(ClientResponse.class, context.getServletInputStream(tmpFile.getAbsolutePath())); } + private ClientResponse touch(TestContext context, Entity entity) { + WebResource resource = context.service.path("api/entities/touch/" + + entity.getEntityType().name().toLowerCase() + "/" + entity.getName()); + ClientResponse clientResponse = resource + .header("Cookie", context.getAuthenticationToken()) + .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML) + .post(ClientResponse.class); + return clientResponse; + } + @Test public void testUpdateCheckUser() throws Exception { TestContext context = newContext(); @@ -343,6 +353,50 @@ public class EntityManagerJerseyIT { Assert.assertEquals(bundles.size(), 1); } + @Test + public void testTouchEntity() throws Exception { + TestContext context = newContext(); + context.scheduleProcess(); + OozieTestUtils.waitForBundleStart(context, Job.Status.RUNNING); + List<BundleJob> bundles = OozieTestUtils.getBundles(context); + Assert.assertEquals(bundles.size(), 1); + ProxyOozieClient ozClient = OozieTestUtils.getOozieClient(context.getCluster().getCluster()); + String bundle = bundles.get(0).getId(); + String coordId = ozClient.getBundleJobInfo(bundle).getCoordinators().get(0).getId(); + + //Update end time of process required for touch + Process process = (Process) getDefinition(context, EntityType.PROCESS, context.processName); + updateEndtime(process); + ClientResponse response = update(context, process, null); + context.assertSuccessful(response); + bundles = OozieTestUtils.getBundles(context); + Assert.assertEquals(bundles.size(), 1); + + //Calling force update + response = touch(context, process); + context.assertSuccessful(response); + OozieTestUtils.waitForBundleStart(context, Status.PREP, Status.RUNNING); + + //Assert that touch creates new bundle and old coord is running + bundles = OozieTestUtils.getBundles(context); + Assert.assertEquals(bundles.size(), 2); + CoordinatorJob coord = ozClient.getCoordJobInfo(coordId); + Assert.assertTrue(coord.getStatus() == Status.RUNNING || coord.getStatus() == Status.SUCCEEDED); + + //Assert on new bundle/coord + String newBundle = null; + for (BundleJob myBundle : bundles) { + if (!myBundle.getId().equals(bundle)) { + newBundle = myBundle.getId(); + break; + } + } + + assert newBundle != null; + coord = ozClient.getCoordJobInfo(ozClient.getBundleJobInfo(newBundle).getCoordinators().get(0).getId()); + Assert.assertTrue(coord.getStatus() == Status.RUNNING || coord.getStatus() == Status.PREP); + } + public void testStatus() throws Exception { TestContext context = newContext(); ClientResponse response;
