Repository: falcon Updated Branches: refs/heads/master ccdf02e7e -> fd8614538
FALCON-950 Rerun does not work on succeeded instances. Contributed by Suhas Vasu Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/fd861453 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/fd861453 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/fd861453 Branch: refs/heads/master Commit: fd861453807246fc3af2c82b137c643dad2a0e1f Parents: ccdf02e Author: Suhas Vasu <[email protected]> Authored: Wed Mar 4 13:04:40 2015 +0530 Committer: Suhas Vasu <[email protected]> Committed: Wed Mar 4 13:04:40 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../java/org/apache/falcon/cli/FalconCLI.java | 16 ++++++++- .../org/apache/falcon/client/FalconClient.java | 36 ++++++++++++++++---- .../workflow/engine/AbstractWorkflowEngine.java | 2 +- docs/src/site/twiki/FalconCLI.twiki | 6 ++-- docs/src/site/twiki/restapi/InstanceRerun.twiki | 25 +++++++++++++- .../workflow/engine/OozieWorkflowEngine.java | 9 +++-- .../resource/AbstractInstanceManager.java | 6 ++-- .../falcon/resource/channel/HTTPChannel.java | 4 +-- .../resource/proxy/InstanceManagerProxy.java | 5 +-- .../apache/falcon/resource/InstanceManager.java | 5 +-- 11 files changed, 95 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 21ee6e8..a2a54bf 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -92,6 +92,8 @@ Trunk (Unreleased) (Suhas vasu) BUG FIXES + FALCON-950 Rerun does not work on succeeded instances (Suhas Vasu) + FALCON-1048 Incorrect documentation for feed instacnce listing api. (Suhas Vasu via Srikanth Sundarrajan) http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/client/src/main/java/org/apache/falcon/cli/FalconCLI.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java index ac76a9c..92b5347 100644 --- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java +++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java @@ -95,6 +95,7 @@ public class FalconCLI { public static final String NUM_RESULTS_OPT = "numResults"; public static final String NUM_INSTANCES_OPT = "numInstances"; public static final String PATTERN_OPT = "pattern"; + public static final String FORCE_RERUN_FLAG = "force"; public static final String INSTANCE_CMD = "instance"; public static final String START_OPT = "start"; @@ -286,11 +287,15 @@ public class FalconCLI { } else if (optionsList.contains(RERUN_OPT)) { validateNotEmpty(start, START_OPT); validateNotEmpty(end, END_OPT); + boolean isForced = false; + if (optionsList.contains(FORCE_RERUN_FLAG)) { + isForced = true; + } result = ResponseHelper.getString(client .rerunInstances(type, entity, start, end, filePath, colo, clusters, sourceClusters, - lifeCycles)); + lifeCycles, isForced)); } else if (optionsList.contains(LOG_OPT)) { validateOrderBy(orderBy, instanceAction); validateFilterBy(filterBy, instanceAction); @@ -355,6 +360,12 @@ public class FalconCLI { throw new FalconCLIException("Invalid argument: sourceClusters"); } } + + if (optionsList.contains(FORCE_RERUN_FLAG)) { + if (!optionsList.contains(RERUN_OPT)) { + throw new FalconCLIException("Force option can be used only with instance rerun"); + } + } } private void entityCommand(CommandLine commandLine, FalconClient client) @@ -797,6 +808,8 @@ public class FalconCLI { "Start returning instances from this offset"); Option numResults = new Option(NUM_RESULTS_OPT, true, "Number of results to return per request"); + Option forceRerun = new Option(FORCE_RERUN_FLAG, false, + "Flag to forcefully rerun entire workflow of an instance"); instanceOptions.addOption(url); instanceOptions.addOptionGroup(group); @@ -815,6 +828,7 @@ public class FalconCLI { instanceOptions.addOption(orderBy); instanceOptions.addOption(sortOrder); instanceOptions.addOption(numResults); + instanceOptions.addOption(forceRerun); return instanceOptions; } http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index 86397c4..a866bb0 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -461,7 +461,8 @@ public class FalconClient { public InstancesResult rerunInstances(String type, String entity, String start, String end, String filePath, String colo, - String clusters, String sourceClusters, List<LifeCycle> lifeCycles) + String clusters, String sourceClusters, List<LifeCycle> lifeCycles, + Boolean isForced) throws FalconCLIException, IOException { StringBuilder buffer = new StringBuilder(); @@ -480,7 +481,7 @@ public class FalconClient { } String temp = (buffer.length() == 0) ? null : buffer.toString(); return sendInstanceRequest(Instances.RERUN, type, entity, start, end, - getServletInputStream(clusters, sourceClusters, temp), null, colo, lifeCycles); + getServletInputStream(clusters, sourceClusters, temp), null, colo, lifeCycles, isForced); } public InstancesResult getLogsOfInstances(String type, String entity, String start, @@ -605,7 +606,8 @@ public class FalconClient { String start, String end, String runId, String colo, String fields, String filterBy, String tags, String orderBy, String sortOrder, Integer offset, - Integer numResults, Integer numInstances, String searchPattern) { + Integer numResults, Integer numInstances, String searchPattern, + Boolean isForced) { if (!StringUtils.isEmpty(fields)) { resource = resource.queryParam("fields", fields); @@ -647,6 +649,9 @@ public class FalconClient { if (!StringUtils.isEmpty(searchPattern)) { resource = resource.queryParam("pattern", searchPattern); } + if (isForced != null) { + resource = resource.queryParam("force", String.valueOf(isForced)); + } return resource; } @@ -664,7 +669,7 @@ public class FalconClient { resource = addParamsToResource(resource, start, end, null, null, fields, filterBy, filterTags, orderBy, sortOrder, - offset, numResults, numInstances, null); + offset, numResults, numInstances, null, null); ClientResponse clientResponse = resource .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) @@ -734,16 +739,35 @@ public class FalconClient { .getEntity(InstancesResult.class); } + private InstancesResult sendInstanceRequest(Instances instances, String type, + String entity, String start, String end, InputStream props, + String runid, String colo, List<LifeCycle> lifeCycles, + Boolean isForced) throws FalconCLIException { + return sendInstanceRequest(instances, type, entity, start, end, props, + runid, colo, lifeCycles, "", "", "", 0, DEFAULT_NUM_RESULTS, isForced).getEntity(InstancesResult.class); + } + + + private ClientResponse sendInstanceRequest(Instances instances, String type, String entity, String start, String end, InputStream props, String runid, String colo, List<LifeCycle> lifeCycles, String filterBy, String orderBy, String sortOrder, Integer offset, Integer numResults) throws FalconCLIException { + + return sendInstanceRequest(instances, type, entity, start, end, props, + runid, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults, null); + } + + private ClientResponse sendInstanceRequest(Instances instances, String type, String entity, + String start, String end, InputStream props, String runid, String colo, + List<LifeCycle> lifeCycles, String filterBy, String orderBy, String sortOrder, + Integer offset, Integer numResults, Boolean isForced) throws FalconCLIException { checkType(type); WebResource resource = service.path(instances.path).path(type) .path(entity); resource = addParamsToResource(resource, start, end, runid, colo, - null, filterBy, null, orderBy, sortOrder, offset, numResults, null, null); + null, filterBy, null, orderBy, sortOrder, offset, numResults, null, null, isForced); if (lifeCycles != null) { checkLifeCycleOption(lifeCycles, type); @@ -800,7 +824,7 @@ public class FalconClient { WebResource resource = service.path(entities.path) .path(entityType); resource = addParamsToResource(resource, null, null, null, null, fields, filterBy, filterTags, - orderBy, sortOrder, offset, numResults, null, searchPattern); + orderBy, sortOrder, offset, numResults, null, searchPattern, null); ClientResponse clientResponse = resource .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java index 6b10679..07fafb5 100644 --- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java +++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java @@ -74,7 +74,7 @@ public abstract class AbstractWorkflowEngine { List<LifeCycle> lifeCycles) throws FalconException; public abstract InstancesResult reRunInstances(Entity entity, Date start, Date end, Properties props, - List<LifeCycle> lifeCycles) throws FalconException; + List<LifeCycle> lifeCycles, Boolean isForced) throws FalconException; public abstract InstancesResult suspendInstances(Entity entity, Date start, Date end, Properties props, List<LifeCycle> lifeCycles) throws FalconException; http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/docs/src/site/twiki/FalconCLI.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki index ef2152f..d503d22 100644 --- a/docs/src/site/twiki/FalconCLI.twiki +++ b/docs/src/site/twiki/FalconCLI.twiki @@ -142,10 +142,12 @@ $FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -continue ---+++Rerun -Rerun option is used to rerun instances of a given process. This option is valid only for process instances in terminal state, i.e. SUCCEDDED, KILLED or FAILED. Optionally, you can specify the properties to override. +Rerun option is used to rerun instances of a given process. On issuing a rerun, by default the execution resumes from the last failed node in the workflow. This option is valid only for process instances in terminal state, i.e. SUCCEEDED, KILLED or FAILED. +If one wants to forcefully rerun the entire workflow, -force should be passed along with -rerun +Additionally, you can also specify properties to override via a properties file. Usage: -$FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -rerun -start "yyyy-MM-dd'T'HH:mm'Z'" -end "yyyy-MM-dd'T'HH:mm'Z'" [-file <<properties file>>] +$FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -rerun -start "yyyy-MM-dd'T'HH:mm'Z'" -end "yyyy-MM-dd'T'HH:mm'Z'" [-force] [-file <<properties file>>] ---+++Resume http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/docs/src/site/twiki/restapi/InstanceRerun.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/restapi/InstanceRerun.twiki b/docs/src/site/twiki/restapi/InstanceRerun.twiki index d98ae3a..ec30a1e 100644 --- a/docs/src/site/twiki/restapi/InstanceRerun.twiki +++ b/docs/src/site/twiki/restapi/InstanceRerun.twiki @@ -5,7 +5,7 @@ * <a href="#Examples">Examples</a> ---++ Description -Rerun instances of an entity. +Rerun instances of an entity. On issuing a rerun, by default the execution resumes from the last failed node in the workflow. ---++ Parameters * :entity-type can either be a feed or a process. @@ -13,6 +13,7 @@ Rerun instances of an entity. * start is the start time of the instance that you want to refer to * end is the end time of the instance that you want to refer to * lifecycle <optional param> can be Eviction/Replication(default) for feed and Execution(default) for process. + * force <optional param> can be used to forcefully rerun the entire instance. ---++ Results Results of the rerun command. @@ -40,3 +41,25 @@ POST http://localhost:15000/api/instance/rerun/process/SampleProcess?colo=*&star "status": "SUCCEEDED" } </verbatim> + +<verbatim> +POST http://localhost:15000/api/instance/rerun/process/SampleProcess?colo=*&start=2013-04-03T07:00Z&end=2014-04-03T07:00Z&force=true +</verbatim> +---+++ Result +<verbatim> +{ + "instances": [ + { + "details": "", + "startTime": "2013-10-21T15:10:47-07:00", + "cluster": "primary-cluster", + "logFile": "http:\/\/localhost:11000\/oozie?job=0000070-131021115933395-oozie-rgau-W", + "status": "RUNNING", + "instance": "2012-04-03T07:00Z" + } + ], + "requestId": "default\/7a3582bd-608c-45a7-9b74-1837b51ba6d5\n", + "message": "default\/RERUN\n", + "status": "SUCCEEDED" +} +</verbatim> http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index 2733cca..62c04ea 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -496,7 +496,11 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { @Override public InstancesResult reRunInstances(Entity entity, Date start, Date end, - Properties props, List<LifeCycle> lifeCycles) throws FalconException { + Properties props, List<LifeCycle> lifeCycles, + Boolean isForced) throws FalconException { + if (isForced != null && isForced) { + props.put(OozieClient.RERUN_FAIL_NODES, String.valueOf(!isForced)); + } return doJobAction(JobAction.RERUN, entity, start, end, props, lifeCycles); } @@ -1317,7 +1321,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { jobprops.putAll(props); } //if user has set any of these oozie rerun properties then force rerun flag is ignored - if (!jobprops.contains(OozieClient.RERUN_FAIL_NODES) && !jobprops.contains(OozieClient.RERUN_SKIP_NODES)) { + if (!jobprops.containsKey(OozieClient.RERUN_FAIL_NODES) + && !jobprops.containsKey(OozieClient.RERUN_SKIP_NODES)) { jobprops.put(OozieClient.RERUN_FAIL_NODES, String.valueOf(!isForced)); } jobprops.remove(OozieClient.COORDINATOR_APP_PATH); http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java index ed30869..f9f41d3 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java @@ -443,9 +443,10 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { } } + //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck public InstancesResult reRunInstance(String type, String entity, String startStr, String endStr, HttpServletRequest request, - String colo, List<LifeCycle> lifeCycles) { + String colo, List<LifeCycle> lifeCycles, Boolean isForced) { checkColo(colo); checkType(type); try { @@ -458,12 +459,13 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { Properties props = getProperties(request); AbstractWorkflowEngine wfEngine = getWorkflowEngine(); return wfEngine.reRunInstances(entityObject, - startAndEndDate.first, startAndEndDate.second, props, lifeCycles); + startAndEndDate.first, startAndEndDate.second, props, lifeCycles, isForced); } catch (Exception e) { LOG.error("Failed to rerun instances", e); throw FalconWebException.newInstanceException(e, Response.Status.BAD_REQUEST); } } + //RESUME CHECKSTYLE CHECK ParameterNumberCheck private Properties getProperties(HttpServletRequest request) throws IOException { Properties props = new Properties(); http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java b/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java index 7f261ce..b8db4b5 100644 --- a/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java +++ b/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java @@ -138,8 +138,8 @@ public class HTTPChannel extends AbstractChannel { Annotation[][] paramAnnotations = method.getParameterAnnotations(); StringBuilder queryString = new StringBuilder("?"); for (int index = 0; index < args.length; index++) { - if (args[index] instanceof String) { - String arg = (String) args[index]; + if (args[index] instanceof String || args[index] instanceof Boolean) { + String arg = String.valueOf(args[index]); for (int annotation = 0; annotation < paramAnnotations[index].length; annotation++) { Annotation paramAnnotation = paramAnnotations[index][annotation]; String annotationClass = paramAnnotation.annotationType().getName(); http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java index e6cf904..e304bd8 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java @@ -327,14 +327,15 @@ public class InstanceManagerProxy extends AbstractInstanceManager { @Dimension("end-time") @QueryParam("end") final String endStr, @Context HttpServletRequest request, @Dimension("colo") @QueryParam("colo") String colo, - @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles) { + @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles, + @Dimension("force") @QueryParam("force") final Boolean isForced) { final HttpServletRequest bufferedRequest = new BufferedRequest(request); return new InstanceProxy<InstancesResult>(InstancesResult.class) { @Override protected InstancesResult doExecute(String colo) throws FalconException { return getInstanceManager(colo).invoke("reRunInstance", - type, entity, startStr, endStr, bufferedRequest, colo, lifeCycles); + type, entity, startStr, endStr, bufferedRequest, colo, lifeCycles, isForced); } }.execute(colo, type, entity); } http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java index d4e0ae0..dc533a2 100644 --- a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java +++ b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java @@ -224,8 +224,9 @@ public class InstanceManager extends AbstractInstanceManager { @Dimension("end-time") @QueryParam("end") String endStr, @Context HttpServletRequest request, @Dimension("colo") @QueryParam("colo") String colo, - @Dimension("lifecycle") @QueryParam("lifecycle") List<LifeCycle> lifeCycles) { - return super.reRunInstance(type, entity, startStr, endStr, request, colo, lifeCycles); + @Dimension("lifecycle") @QueryParam("lifecycle") List<LifeCycle> lifeCycles, + @Dimension("force") @QueryParam("force") Boolean isForced) { + return super.reRunInstance(type, entity, startStr, endStr, request, colo, lifeCycles, isForced); } //RESUME CHECKSTYLE CHECK ParameterNumberCheck
