Repository: falcon Updated Branches: refs/heads/master 5d8b36c16 -> b806b32fd
FALCON-1434 Enhance schedule API to accept key-value properties(Contributed by Pallavi Rao) Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/b806b32f Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/b806b32f Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/b806b32f Branch: refs/heads/master Commit: b806b32fd6829b4f6f49e612a6668df55c49bd03 Parents: 5d8b36c Author: Pallavi Rao <[email protected]> Authored: Fri Sep 18 12:20:12 2015 +0530 Committer: Pallavi Rao <[email protected]> Committed: Fri Sep 18 12:20:12 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../java/org/apache/falcon/cli/FalconCLI.java | 9 +++-- .../falcon/client/AbstractFalconClient.java | 4 +-- .../org/apache/falcon/client/FalconClient.java | 36 ++++++++++++-------- .../org/apache/falcon/entity/EntityUtil.java | 23 +++++++++++++ .../workflow/engine/AbstractWorkflowEngine.java | 3 +- .../apache/falcon/entity/EntityUtilTest.java | 30 ++++++++++++++++ .../workflow/engine/OozieWorkflowEngine.java | 3 +- .../AbstractSchedulableEntityManager.java | 19 ++++++----- .../proxy/SchedulableEntityManagerProxy.java | 11 +++--- .../apache/falcon/unit/FalconUnitClient.java | 8 ++--- .../apache/falcon/unit/FalconUnitTestBase.java | 13 ++++--- .../org/apache/falcon/unit/TestFalconUnit.java | 3 +- .../resource/SchedulableEntityManager.java | 5 +-- .../java/org/apache/falcon/cli/FalconCLIIT.java | 3 +- .../falcon/resource/EntityManagerJerseyIT.java | 4 +-- .../org/apache/falcon/resource/TestContext.java | 20 ++++++----- 17 files changed, 139 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 013e0fd..25f02f0 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -8,6 +8,8 @@ Trunk (Unreleased) FALCON-1027 Falcon proxy user support(Sowmya Ramesh) IMPROVEMENTS + FALCON-1434 Enhance schedule API to accept key-value properties(Pallavi Rao) + FALCON-1425 Provide Email based plugin to send Notification once instance completed(Peeyush Bishnoi via Ajay Yadava) FALCON-1205 SLAService to keep track of missing SLAs for feeds(Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/client/src/main/java/org/apache/falcon/cli/FalconCLI.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java index e684678..c914649 100644 --- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java +++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java @@ -91,6 +91,7 @@ public class FalconCLI { public static final String LIST_OPT = "list"; public static final String TOUCH_OPT = "touch"; public static final String SKIPDRYRUN_OPT = "skipDryRun"; + public static final String PROPS_OPT = "properties"; public static final String FIELDS_OPT = "fields"; public static final String FILTER_BY_OPT = "filterBy"; @@ -441,6 +442,8 @@ public class FalconCLI { skipDryRun = true; } + String userProps = commandLine.getOptionValue(PROPS_OPT); + EntityType entityTypeEnum = null; if (optionsList.contains(LIST_OPT)) { if (entityType == null) { @@ -476,7 +479,7 @@ public class FalconCLI { } else if (optionsList.contains(SUBMIT_AND_SCHEDULE_OPT)) { validateNotEmpty(filePath, "file"); validateColo(optionsList); - result = client.submitAndSchedule(entityType, filePath, skipDryRun, doAsUser).getMessage(); + result = client.submitAndSchedule(entityType, filePath, skipDryRun, doAsUser, userProps).getMessage(); } else if (optionsList.contains(VALIDATE_OPT)) { validateNotEmpty(filePath, "file"); validateColo(optionsList); @@ -484,7 +487,7 @@ public class FalconCLI { } else if (optionsList.contains(SCHEDULE_OPT)) { validateNotEmpty(entityName, ENTITY_NAME_OPT); colo = getColo(colo); - result = client.schedule(entityTypeEnum, entityName, colo, skipDryRun, doAsUser).getMessage(); + result = client.schedule(entityTypeEnum, entityName, colo, skipDryRun, doAsUser, userProps).getMessage(); } else if (optionsList.contains(SUSPEND_OPT)) { validateNotEmpty(entityName, ENTITY_NAME_OPT); colo = getColo(colo); @@ -759,6 +762,7 @@ public class FalconCLI { Option path = new Option(PATH_OPT, true, "Path for a feed's instance"); Option skipDryRun = new Option(SKIPDRYRUN_OPT, false, "skip dry run in workflow engine"); Option doAs = new Option(DO_AS_OPT, true, "doAs user"); + Option userProps = new Option(PROPS_OPT, true, "User supplied comma separated key value properties"); entityOptions.addOption(url); entityOptions.addOption(path); @@ -782,6 +786,7 @@ public class FalconCLI { entityOptions.addOption(numInstances); entityOptions.addOption(skipDryRun); entityOptions.addOption(doAs); + entityOptions.addOption(userProps); return entityOptions; } http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java index 1146011..265e08c 100644 --- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java @@ -48,7 +48,7 @@ public abstract class AbstractFalconClient { * @return * @throws FalconCLIException */ - public abstract APIResult schedule(EntityType entityType, String entityName, - String colo, Boolean skipDryRun, String doAsuser) throws FalconCLIException; + public abstract APIResult schedule(EntityType entityType, String entityName, String colo, Boolean skipDryRun, + String doAsuser, String properties) throws FalconCLIException; } http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index 6075f5c..981559b 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -287,32 +287,32 @@ public class FalconClient extends AbstractFalconClient { } public APIResult schedule(EntityType entityType, String entityName, String colo, - Boolean skipDryRun, String doAsUser) + Boolean skipDryRun, String doAsUser, String properties) throws FalconCLIException { return sendEntityRequest(Entities.SCHEDULE, entityType, entityName, - colo, skipDryRun, doAsUser); + colo, skipDryRun, doAsUser, properties); } public APIResult suspend(EntityType entityType, String entityName, String colo, String doAsUser) throws FalconCLIException { - return sendEntityRequest(Entities.SUSPEND, entityType, entityName, colo, null, doAsUser); + return sendEntityRequest(Entities.SUSPEND, entityType, entityName, colo, null, doAsUser, null); } public APIResult resume(EntityType entityType, String entityName, String colo, String doAsUser) throws FalconCLIException { - return sendEntityRequest(Entities.RESUME, entityType, entityName, colo, null, doAsUser); + return sendEntityRequest(Entities.RESUME, entityType, entityName, colo, null, doAsUser, null); } public APIResult delete(EntityType entityType, String entityName, String doAsUser) throws FalconCLIException { - return sendEntityRequest(Entities.DELETE, entityType, entityName, null, null, doAsUser); + return sendEntityRequest(Entities.DELETE, entityType, entityName, null, null, doAsUser, null); } @@ -321,7 +321,7 @@ public class FalconClient extends AbstractFalconClient { InputStream entityStream = getServletInputStream(filePath); return sendEntityRequestWithObject(Entities.VALIDATE, entityType, - entityStream, null, skipDryRun, doAsUser); + entityStream, null, skipDryRun, doAsUser, null); } public APIResult submit(String entityType, String filePath, String doAsUser) @@ -329,7 +329,7 @@ public class FalconClient extends AbstractFalconClient { InputStream entityStream = getServletInputStream(filePath); return sendEntityRequestWithObject(Entities.SUBMIT, entityType, - entityStream, null, null, doAsUser); + entityStream, null, null, doAsUser, null); } public APIResult update(String entityType, String entityName, String filePath, @@ -353,18 +353,18 @@ public class FalconClient extends AbstractFalconClient { } public APIResult submitAndSchedule(String entityType, String filePath, - Boolean skipDryRun, String doAsUser) + Boolean skipDryRun, String doAsUser, String properties) throws FalconCLIException { InputStream entityStream = getServletInputStream(filePath); return sendEntityRequestWithObject(Entities.SUBMITandSCHEDULE, - entityType, entityStream, null, skipDryRun, doAsUser); + entityType, entityStream, null, skipDryRun, doAsUser, properties); } public APIResult getStatus(EntityType entityType, String entityName, String colo, String doAsUser) throws FalconCLIException { - return sendEntityRequest(Entities.STATUS, entityType, entityName, colo, null, doAsUser); + return sendEntityRequest(Entities.STATUS, entityType, entityName, colo, null, doAsUser, null); } public Entity getDefinition(String entityType, String entityName, String doAsUser) @@ -641,7 +641,7 @@ public class FalconClient extends AbstractFalconClient { private APIResult sendEntityRequest(Entities entities, EntityType entityType, String entityName, String colo, Boolean skipDryRun, - String doAsUser) throws FalconCLIException { + String doAsUser, String properties) throws FalconCLIException { WebResource resource = service.path(entities.path) .path(entityType.toString().toLowerCase()).path(entityName); @@ -655,6 +655,10 @@ public class FalconClient extends AbstractFalconClient { resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser); } + if (StringUtils.isNotEmpty(properties)) { + resource = resource.queryParam("properties", properties); + } + ClientResponse clientResponse = resource .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) .accept(entities.mimeType).type(MediaType.TEXT_XML) @@ -788,8 +792,8 @@ public class FalconClient extends AbstractFalconClient { } private APIResult sendEntityRequestWithObject(Entities entities, String entityType, - Object requestObject, String colo, - Boolean skipDryRun, String doAsUser) throws FalconCLIException { + Object requestObject, String colo, Boolean skipDryRun, + String doAsUser, String properties) throws FalconCLIException { WebResource resource = service.path(entities.path) .path(entityType); if (colo != null) { @@ -803,6 +807,10 @@ public class FalconClient extends AbstractFalconClient { if (StringUtils.isNotEmpty(doAsUser)) { resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser); } + + if (StringUtils.isNotEmpty(properties)) { + resource = resource.queryParam("properties", properties); + } ClientResponse clientResponse = resource .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) .accept(entities.mimeType).type(MediaType.TEXT_XML) @@ -1094,7 +1102,7 @@ public class FalconClient extends AbstractFalconClient { RecipeTool.main(args); } validate(EntityType.PROCESS.toString(), processFile, skipDryRun, doAsUser); - return submitAndSchedule(EntityType.PROCESS.toString(), processFile, skipDryRun, doAsUser); + return submitAndSchedule(EntityType.PROCESS.toString(), processFile, skipDryRun, doAsUser, null); } catch (Exception e) { throw new FalconCLIException(e.getMessage(), e); } http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/common/src/main/java/org/apache/falcon/entity/EntityUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java index 646afc3..ad41674 100644 --- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java +++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java @@ -950,4 +950,27 @@ public final class EntityUtil { throw new IllegalArgumentException("Unhandled type: " + entity.getEntityType()); } } + + + /** + * @param properties - String of format key1:value1, key2:value2 + * @return + */ + public static Map<String, String> getPropertyMap(String properties) { + Map<String, String> props = new HashMap<>(); + if (StringUtils.isNotEmpty(properties)) { + String[] kvPairs = properties.split(","); + for (String kvPair : kvPairs) { + String[] keyValue = kvPair.trim().split(":", 2); + if (keyValue.length == 2 && !keyValue[0].trim().isEmpty() && !keyValue[1].trim().isEmpty()) { + props.put(keyValue[0].trim(), keyValue[1].trim()); + } else { + throw new IllegalArgumentException("Found invalid property " + keyValue[0] + + ". Schedule properties must be comma separated key-value pairs. " + + " Example: key1:value1,key2:value2"); + } + } + } + return props; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java index ea86c2a..265106b 100644 --- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java +++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java @@ -51,7 +51,8 @@ public abstract class AbstractWorkflowEngine { public abstract boolean isAlive(Cluster cluster) throws FalconException; - public abstract void schedule(Entity entity, Boolean skipDryRun) throws FalconException; + public abstract void schedule(Entity entity, Boolean skipDryRun, Map<String, String> properties) + throws FalconException; public abstract String suspend(Entity entity) throws FalconException; http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java b/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java index f6a4679..d022bae 100644 --- a/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java +++ b/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java @@ -43,6 +43,7 @@ import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; +import java.util.Map; import java.util.Properties; import java.util.TimeZone; @@ -375,4 +376,33 @@ public class EntityUtilTest extends AbstractTestBase { {new Path("/projects/falcon/staging/falcon/workflows/process/test-process/"), true, false}, }; } + + @Test + public void testStringToProps() { + String testPropsString = "key1:value1,key2 : value2 , key3: value3, key4:value4:test"; + Map<String, String> props = EntityUtil.getPropertyMap(testPropsString); + Assert.assertEquals(props.size(), 4); + for (int i = 1; i <= 3; i++) { + Assert.assertEquals(props.get("key" + i), "value" + i); + } + Assert.assertEquals(props.get("key4"), "value4:test"); + } + + @Test (expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "Found invalid property .*", + dataProvider = "InvalidProps") + public void testInvalidStringToProps(String propString) { + String[] invalidProps = {"key1", "key1=value1", "key1:value1,key2=value2, :value"}; + EntityUtil.getPropertyMap(propString); + } + + @DataProvider(name = "InvalidProps") + public Object[][] getInvalidProps() { + return new Object[][]{ + {"key1"}, + {"key1=value1"}, + {"key1:value1,key2=value2"}, + {":value"}, + }; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index 5f79ca1..96661ad 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -148,7 +148,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } @Override - public void schedule(Entity entity, Boolean skipDryRun) throws FalconException { + public void schedule(Entity entity, Boolean skipDryRun, Map<String, String> suppliedProps) throws FalconException { Map<String, BundleJob> bundleMap = findLatestBundle(entity); List<String> schedClusters = new ArrayList<String>(); for (Map.Entry<String, BundleJob> entry : bundleMap.entrySet()) { @@ -172,7 +172,6 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { LOG.info("Entity {} is not scheduled on cluster {}", entity.getName(), cluster); continue; } - //Do dryRun of coords before schedule as schedule is asynchronous dryRunInternal(cluster, new Path(properties.getProperty(OozieEntityBuilder.ENTITY_PATH)), skipDryRun); scheduleEntity(clusterName, properties, entity); http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java index 61638f3..3280789 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java @@ -39,8 +39,8 @@ import javax.ws.rs.PathParam; import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.Response; -import java.util.Arrays; import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; import java.util.HashSet; import java.util.List; @@ -67,10 +67,11 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM @Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") String type, @Dimension("entityName") @PathParam("entity") String entity, @Dimension("colo") @PathParam("colo") String colo, - @QueryParam("skipDryRun") Boolean skipDryRun) { + @QueryParam("skipDryRun") Boolean skipDryRun, + @QueryParam("properties") String properties) { checkColo(colo); try { - scheduleInternal(type, entity, skipDryRun); + scheduleInternal(type, entity, skipDryRun, EntityUtil.getPropertyMap(properties)); return new APIResult(APIResult.Status.SUCCEEDED, entity + "(" + type + ") scheduled successfully"); } catch (Throwable e) { LOG.error("Unable to schedule workflow", e); @@ -78,8 +79,8 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM } } - private synchronized void scheduleInternal(String type, String entity, Boolean skipDryRun) - throws FalconException, AuthorizationException { + private synchronized void scheduleInternal(String type, String entity, Boolean skipDryRun, + Map<String, String> properties) throws FalconException, AuthorizationException { checkSchedulableEntity(type); Entity entityObj = null; @@ -91,7 +92,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM + entityObj.toShortString()); } LOG.info("Memory lock obtained for {} by {}", entityObj.toShortString(), Thread.currentThread().getName()); - getWorkflowEngine().schedule(entityObj, skipDryRun); + getWorkflowEngine().schedule(entityObj, skipDryRun, properties); } catch (Exception e) { throw new FalconException("Entity schedule failed for " + type + ": " + entity, e); } finally { @@ -100,7 +101,6 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM LOG.info("Memory lock released for {}", entityObj.toShortString()); } } - } /** @@ -112,12 +112,13 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM public APIResult submitAndSchedule( @Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") String type, @Dimension("colo") @PathParam("colo") String colo, - @QueryParam("skipDryRun") Boolean skipDryRun) { + @QueryParam("skipDryRun") Boolean skipDryRun, + @QueryParam("properties") String properties) { checkColo(colo); try { checkSchedulableEntity(type); Entity entity = submitInternal(request, type); - scheduleInternal(type, entity.getName(), skipDryRun); + scheduleInternal(type, entity.getName(), skipDryRun, EntityUtil.getPropertyMap(properties)); return new APIResult(APIResult.Status.SUCCEEDED, entity.getName() + "(" + type + ") scheduled successfully"); } catch (Throwable e) { http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java index 23f1605..61a80c1 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java @@ -389,7 +389,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana @Dimension("entityType") @PathParam("type") final String type, @Dimension("entityName") @PathParam("entity") final String entity, @Dimension("colo") @QueryParam("colo") final String coloExpr, - @QueryParam("skipDryRun") final Boolean skipDryRun) { + @QueryParam("skipDryRun") final Boolean skipDryRun, + @QueryParam("properties") final String properties) { final HttpServletRequest bufferedRequest = getBufferedRequest(request); return new EntityProxy(type, entity) { @@ -400,7 +401,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana @Override protected APIResult doExecute(String colo) throws FalconException { - return getEntityManager(colo).invoke("schedule", bufferedRequest, type, entity, colo, skipDryRun); + return getEntityManager(colo).invoke("schedule", bufferedRequest, type, entity, colo, skipDryRun, + properties); } }.execute(); } @@ -414,12 +416,13 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana public APIResult submitAndSchedule( @Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") String type, @Dimension("colo") @QueryParam("colo") String coloExpr, - @QueryParam("skipDryRun") Boolean skipDryRun) { + @QueryParam("skipDryRun") Boolean skipDryRun, + @QueryParam("properties") String properties) { BufferedRequest bufferedRequest = new BufferedRequest(request); String entity = getEntity(bufferedRequest, type).getName(); Map<String, APIResult> results = new HashMap<String, APIResult>(); results.put("submit", submit(bufferedRequest, type, coloExpr)); - results.put("schedule", schedule(bufferedRequest, type, entity, coloExpr, skipDryRun)); + results.put("schedule", schedule(bufferedRequest, type, entity, coloExpr, skipDryRun, properties)); return consolidateResult(results, APIResult.class); } http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java index d907683..169614b 100644 --- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java @@ -119,8 +119,8 @@ public class FalconUnitClient extends AbstractFalconClient { */ @Override public APIResult schedule(EntityType entityType, String entityName, String cluster, - Boolean skipDryRun, String doAsUser) throws FalconCLIException { - return schedule(entityType, entityName, null, 0, cluster, skipDryRun); + Boolean skipDryRun, String doAsUser, String properties) throws FalconCLIException { + return schedule(entityType, entityName, null, 0, cluster, skipDryRun, properties); } @@ -134,7 +134,7 @@ public class FalconUnitClient extends AbstractFalconClient { * @return boolean */ public APIResult schedule(EntityType entityType, String entityName, String startTime, int numInstances, - String cluster, Boolean skipDryRun) throws FalconCLIException { + String cluster, Boolean skipDryRun, String properties) throws FalconCLIException { try { FalconUnitHelper.checkSchedulableEntity(entityType.toString()); Entity entity = EntityUtil.getEntity(entityType, entityName); @@ -147,7 +147,7 @@ public class FalconUnitClient extends AbstractFalconClient { if (StringUtils.isNotEmpty(startTime) && entityType == EntityType.PROCESS) { updateStartAndEndTime((Process) entity, startTime, numInstances, cluster); } - workflowEngine.schedule(entity, skipDryRun); + workflowEngine.schedule(entity, skipDryRun, EntityUtil.getPropertyMap(properties)); LOG.info(entityName + " is scheduled successfully"); return new APIResult(APIResult.Status.SUCCEEDED, entity + "(" + "PROCESS" + ") scheduled successfully"); } catch (FalconException e) { http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java index df73628..995af2b 100644 --- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java +++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java @@ -147,24 +147,27 @@ public class FalconUnitTestBase { } public APIResult scheduleProcess(String processName, String startTime, int numInstances, - String cluster, String localWfPath, Boolean skipDryRun) throws FalconException, - IOException, FalconCLIException { + String cluster, String localWfPath, Boolean skipDryRun, + String properties) throws FalconException, IOException, FalconCLIException { Process processEntity = configStore.get(EntityType.PROCESS, processName); if (processEntity == null) { throw new FalconException("Process not found " + processName); } String workflowPath = processEntity.getWorkflow().getPath(); fs.copyFromLocalFile(new Path(localWfPath), new Path(workflowPath)); - return falconUnitClient.schedule(EntityType.PROCESS, processName, startTime, numInstances, cluster, skipDryRun); + return falconUnitClient.schedule(EntityType.PROCESS, processName, startTime, numInstances, cluster, + skipDryRun, properties); } public APIResult scheduleProcess(String processName, String startTime, int numInstances, - String cluster, Boolean skipDryRun) throws FalconException, FalconCLIException { + String cluster, Boolean skipDryRun, + String properties) throws FalconException, FalconCLIException { Process processEntity = configStore.get(EntityType.PROCESS, processName); if (processEntity == null) { throw new FalconException("Process not found " + processName); } - return falconUnitClient.schedule(EntityType.PROCESS, processName, startTime, numInstances, cluster, skipDryRun); + return falconUnitClient.schedule(EntityType.PROCESS, processName, startTime, numInstances, cluster, + skipDryRun, properties); } private Map<String, String> updateColoAndCluster(String colo, String cluster, Map<String, String> props) { http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java index 498f50e..fa9c664 100644 --- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java +++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java @@ -44,7 +44,8 @@ public class TestFalconUnit extends FalconUnitTestBase { createData("in", "local", scheduleTime, "input.txt"); result = submitProcess(getAbsolutePath("/process.xml"), "/app/oozie-mr"); assertStatus(result); - result = scheduleProcess("process", scheduleTime, 1, "local", getAbsolutePath("/workflow.xml"), true); + result = scheduleProcess("process", scheduleTime, 1, "local", getAbsolutePath("/workflow.xml"), + true, ""); assertStatus(result); waitForStatus(EntityType.PROCESS, "process", scheduleTime); InstancesResult.WorkflowStatus status = falconUnitClient.getInstanceStatus(EntityType.PROCESS, http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java index 3bafb25..1c0fc74 100644 --- a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java +++ b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java @@ -129,8 +129,9 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager { @Dimension("entityType") @PathParam("type") String type, @Dimension("entityName") @PathParam("entity") String entity, @Dimension("colo") @QueryParam("colo") String colo, - @QueryParam("skipDryRun") Boolean skipDryRun) { - return super.schedule(request, type, entity, colo, skipDryRun); + @QueryParam("skipDryRun") Boolean skipDryRun, + @QueryParam("properties") String properties) { + return super.schedule(request, type, entity, colo, skipDryRun, properties); } @POST http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java index b859256..5ed0a4e 100644 --- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java +++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java @@ -184,7 +184,8 @@ public class FalconCLIIT { Assert.assertEquals(executeWithURL("entity -schedule -type feed -name " + overlay.get("outputFeedName")), 0); - Assert.assertEquals(executeWithURL("entity -schedule -type process -name " + overlay.get("processName")), 0); + Assert.assertEquals(executeWithURL("entity -schedule -type process -name " + overlay.get("processName") + + " -properties key:value"), 0); } http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java index 220e5a7..50d5b94 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java +++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java @@ -642,9 +642,9 @@ public class EntityManagerJerseyIT { File tmpFile = TestContext.getTempFile(); EntityType.PROCESS.getMarshaller().marshal(process, tmpFile); if (withDoAs) { - context.scheduleProcess(tmpFile.getAbsolutePath(), overlay, null, "testUser"); + context.scheduleProcess(tmpFile.getAbsolutePath(), overlay, null, "testUser", null); } else { - context.scheduleProcess(tmpFile.getAbsolutePath(), overlay); + context.scheduleProcess(tmpFile.getAbsolutePath(), overlay, null, "", "key1:value1"); } OozieTestUtils.waitForBundleStart(context, Status.RUNNING); http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/webapp/src/test/java/org/apache/falcon/resource/TestContext.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java index f031137..c9e9d4f 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java +++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java @@ -231,20 +231,20 @@ public class TestContext { } public void scheduleProcess(String processTemplate, Map<String, String> overlay) throws Exception { - scheduleProcess(processTemplate, overlay, true, null, ""); + scheduleProcess(processTemplate, overlay, true, null, "", null); } public void scheduleProcess(String processTemplate, Map<String, String> overlay, - Boolean skipDryRun, final String doAsUSer) throws Exception { - scheduleProcess(processTemplate, overlay, true, skipDryRun, doAsUSer); + Boolean skipDryRun, final String doAsUSer, String properties) throws Exception { + scheduleProcess(processTemplate, overlay, true, skipDryRun, doAsUSer, properties); } public void scheduleProcess(String processTemplate, Map<String, String> overlay, boolean succeed) throws Exception{ - scheduleProcess(processTemplate, overlay, succeed, null, ""); + scheduleProcess(processTemplate, overlay, succeed, null, "", null); } public void scheduleProcess(String processTemplate, Map<String, String> overlay, boolean succeed, - Boolean skipDryRun, final String doAsUser) throws Exception { + Boolean skipDryRun, final String doAsUser, String properties) throws Exception { ClientResponse response = submitToFalcon(CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER); assertSuccessful(response); @@ -254,7 +254,7 @@ public class TestContext { response = submitToFalcon(FEED_TEMPLATE2, overlay, EntityType.FEED); assertSuccessful(response); - response = submitAndSchedule(processTemplate, overlay, EntityType.PROCESS, skipDryRun, doAsUser); + response = submitAndSchedule(processTemplate, overlay, EntityType.PROCESS, skipDryRun, doAsUser, properties); if (succeed) { assertSuccessful(response); } else { @@ -289,12 +289,12 @@ public class TestContext { public ClientResponse submitAndSchedule(String template, Map<String, String> overlay, EntityType entityType) throws Exception { - return submitAndSchedule(template, overlay, entityType, null, ""); + return submitAndSchedule(template, overlay, entityType, null, "", null); } public ClientResponse submitAndSchedule(String template, Map<String, String> overlay, EntityType entityType, Boolean skipDryRun, - final String doAsUser) throws Exception { + final String doAsUser, String properties) throws Exception { String tmpFile = overlayParametersOverTemplate(template, overlay); ServletInputStream rawlogStream = getServletInputStream(tmpFile); @@ -308,6 +308,10 @@ public class TestContext { resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser); } + if (StringUtils.isNotEmpty(properties)) { + resource = resource.queryParam("properties", properties); + } + return resource.header("Cookie", getAuthenticationToken()) .accept(MediaType.TEXT_XML) .type(MediaType.TEXT_XML)
