Repository: falcon Updated Branches: refs/heads/master 3f6b69024 -> 860866531
FALCON-2235 Suspend/Resume API support for extension job (user extension) Author: sandeep <sandys...@gmail.com> Reviewers: @pallavi-rao Closes #336 from sandeepSamudrala/FALCON-2235 and squashes the following commits: f1f1f03 [sandeep] FALCON-2235 Incorporated review comments 554824d [sandeep] FALCON-2235 new bufferedRequest to let mark/reset apis validation work for the streams 80ffd94 [sandeep] FALCON-2235 Incorporated Review comments 73a57f8 [sandeep] FALCON-2235 Suspend/Resume API support for extension job (user extension) 32e9982 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2235 73fbf75 [sandeep] Merge branch 'master' of https://github.com/apache/falcon cc28658 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 089b10d [sandeep] Merge branch 'master' of https://github.com/apache/falcon 456d4ee [sandeep] Merge branch 'master' of https://github.com/apache/falcon 0cf9af6 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 4a2e23e [sandeep] Merge branch 'master' of https://github.com/apache/falcon b1546ed [sandeep] Merge branch 'master' of https://github.com/apache/falcon 0a433fb [sandeep] Merge branch 'master' of https://github.com/apache/falcon 194f36a [sandeep] Merge branch 'master' of https://github.com/apache/falcon e0ad358 [sandeep] Merge branch 'master' of https://github.com/apache/falcon f96a084 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 9cf36e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon bbca081 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 48f6afa [sandeep] Merge branch 'master' of https://github.com/apache/falcon 250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon 1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon c065566 [sandeep] reverting last line changes made 1a4dcd2 [sandeep] rebased and resolved the conflicts from master 271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay. a94d4fe [sandeep] rebasing from master 9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/86086653 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/86086653 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/86086653 Branch: refs/heads/master Commit: 86086653134d9c0ccd854237f40a6a15276c0a41 Parents: 3f6b690 Author: sandeep <sandys...@gmail.com> Authored: Fri Jan 6 13:29:02 2017 +0530 Committer: Pallavi Rao <pallavi....@inmobi.com> Committed: Fri Jan 6 13:29:02 2017 +0530 ---------------------------------------------------------------------- .../apache/falcon/cli/FalconExtensionCLI.java | 6 +- .../falcon/client/AbstractFalconClient.java | 19 +++ .../org/apache/falcon/client/FalconClient.java | 6 +- .../resource/AbstractExtensionManager.java | 39 ++---- .../falcon/resource/proxy/EntityProxyUtil.java | 49 +++++++ .../resource/proxy/ExtensionManagerProxy.java | 136 ++++++++++--------- .../proxy/SchedulableEntityManagerProxy.java | 41 +----- .../apache/falcon/unit/FalconUnitClient.java | 22 ++- .../falcon/unit/LocalExtensionManager.java | 42 ++++-- .../org/apache/falcon/unit/TestFalconUnit.java | 17 ++- .../falcon/resource/ExtensionManager.java | 22 +++ 11 files changed, 251 insertions(+), 148 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java ---------------------------------------------------------------------- diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java index 2a105dc..60578d0 100644 --- a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java +++ b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java @@ -139,10 +139,12 @@ public class FalconExtensionCLI extends FalconCLI{ result = client.scheduleExtensionJob(jobName, colo, doAsUser).getMessage(); } else if (optionsList.contains(FalconCLIConstants.SUSPEND_OPT)) { validateRequiredParameter(jobName, JOB_NAME_OPT); - result = client.suspendExtensionJob(jobName, doAsUser).getMessage(); + colo = getColo(colo); + result = client.suspendExtensionJob(jobName, colo, doAsUser).getMessage(); } else if (optionsList.contains(FalconCLIConstants.RESUME_OPT)) { validateRequiredParameter(jobName, JOB_NAME_OPT); - result = client.resumeExtensionJob(jobName, doAsUser).getMessage(); + colo = getColo(colo); + result = client.resumeExtensionJob(jobName, colo, doAsUser).getMessage(); } else if (optionsList.contains(FalconCLIConstants.DELETE_OPT)) { validateRequiredParameter(jobName, JOB_NAME_OPT); result = client.deleteExtensionJob(jobName, doAsUser).getMessage(); http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java index 7b8a606..49392c2 100644 --- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java @@ -255,6 +255,25 @@ public abstract class AbstractFalconClient { * @return APIResult status of the deletion query. */ public abstract APIResult deleteExtensionJob(final String jobName, final String doAsUser); + + /** + * + * @param jobName name of the extension that has to be suspended. + * @param coloExpr comma separated list of colos where the operation has to be performed. + * @param doAsUser proxy user + * @return result status of the suspend operation. + */ + public abstract APIResult suspendExtensionJob(final String jobName, final String coloExpr, final String doAsUser); + + /** + * + * @param jobName name of the extension that has to be resumed. + * @param coloExpr comma separated list of colos where the operation has to be performed. + * @param doAsUser proxy user. + * @return result status of the resume operation. + */ + public abstract APIResult resumeExtensionJob(final String jobName, final String coloExpr, final String doAsUser); + /** * Prepares set of entities the extension has implemented to validate the extension job. * @param jobName job name of the extension job. http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index 2772085..cf457ea 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -1209,17 +1209,19 @@ public class FalconClient extends AbstractFalconClient { return getResponse(APIResult.class, clientResponse); } - public APIResult suspendExtensionJob(final String jobName, final String doAsUser) { + public APIResult suspendExtensionJob(final String jobName, final String coloExpr, final String doAsUser) { ClientResponse clientResponse = new ResourceBuilder() .path(ExtensionOperations.SUSPEND.path, jobName) + .addQueryParam(COLO, coloExpr) .addQueryParam(DO_AS_OPT, doAsUser) .call(ExtensionOperations.SUSPEND); return getResponse(APIResult.class, clientResponse); } - public APIResult resumeExtensionJob(final String jobName, final String doAsUser) { + public APIResult resumeExtensionJob(final String jobName, final String coloExpr, final String doAsUser) { ClientResponse clientResponse = new ResourceBuilder() .path(ExtensionOperations.RESUME.path, jobName) + .addQueryParam(COLO, coloExpr) .addQueryParam(DO_AS_OPT, doAsUser) .call(ExtensionOperations.RESUME); return getResponse(APIResult.class, clientResponse); http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java index 63bf1b6..ff89682 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java @@ -20,11 +20,8 @@ package org.apache.falcon.resource; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.FalconWebException; -import org.apache.falcon.entity.EntityNotRegisteredException; -import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.parser.ValidationException; import org.apache.falcon.extensions.ExtensionStatus; -import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.extensions.jdbc.ExtensionMetaStore; import org.apache.falcon.extensions.store.ExtensionStore; @@ -37,8 +34,6 @@ import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.ws.rs.core.Response; -import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.SortedMap; import java.util.TreeMap; @@ -59,9 +54,9 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager { private static final String LAST_UPDATE_TIME = "lastUpdatedTime"; public static final String NAME = "name"; - protected static final String EXTENSION_TYPE = "type"; - protected static final String EXTENSION_DESC = "description"; - protected static final String EXTENSION_LOCATION = "location"; + private static final String EXTENSION_TYPE = "type"; + private static final String EXTENSION_DESC = "description"; + private static final String EXTENSION_LOCATION = "location"; protected static void validateExtensionName(final String extensionName) { if (StringUtils.isBlank(extensionName)) { @@ -114,28 +109,14 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager { } } - protected SortedMap<EntityType, List<Entity>> getJobEntities(ExtensionJobsBean extensionJobsBean) - throws FalconException, IOException { - TreeMap<EntityType, List<Entity>> entityMap = new TreeMap<>(); - List<String> processes = extensionJobsBean.getProcesses(); - List<String> feeds = extensionJobsBean.getFeeds(); - entityMap.put(EntityType.PROCESS, getEntities(processes, EntityType.PROCESS)); - entityMap.put(EntityType.FEED, getEntities(feeds, EntityType.FEED)); + protected SortedMap<EntityType, List<String>> getJobEntities(ExtensionJobsBean extensionJobsBean) + throws FalconException { + TreeMap<EntityType, List<String>> entityMap = new TreeMap<>(); + entityMap.put(EntityType.PROCESS, extensionJobsBean.getProcesses()); + entityMap.put(EntityType.FEED, extensionJobsBean.getFeeds()); return entityMap; } - private List<Entity> getEntities(List<String> entityNames, EntityType entityType) throws FalconException { - List<Entity> entities = new ArrayList<>(); - for (String entityName : entityNames) { - try { - entities.add(EntityUtil.getEntity(entityType, entityName)); - } catch (EntityNotRegisteredException e) { - LOG.error("Entity {} not found during deletion nothing to do", entityName); - } - } - return entities; - } - private JSONObject buildExtensionJobDetailResult(final String jobName) throws FalconException { ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); ExtensionJobsBean jobsBean = metaStore.getExtensionJobDetails(jobName); @@ -174,7 +155,7 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager { return tags.substring(nameStart, nameEnd); } - public String disableExtension(String extensionName, String currentUser) { + protected String disableExtension(String extensionName, String currentUser) { validateExtensionName(extensionName); try { return ExtensionStore.get().updateExtensionStatus(extensionName, currentUser, ExtensionStatus.DISABLED); @@ -183,7 +164,7 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager { } } - public String enableExtension(String extensionName, String currentUser) { + protected String enableExtension(String extensionName, String currentUser) { validateExtensionName(extensionName); try { return ExtensionStore.get().updateExtensionStatus(extensionName, currentUser, ExtensionStatus.ENABLED); http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java b/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java index ae0a61a..7d00442 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java @@ -120,6 +120,55 @@ class EntityProxyUtil { return results; } + APIResult proxySchedule(final String type, final String entity, final String coloExpr, + final Boolean skipDryRun, final String properties, + final HttpServletRequest bufferedRequest) { + return new EntityProxy(type, entity) { + @Override + protected Set<String> getColosToApply() { + return getColosFromExpression(coloExpr, type, entity); + } + + @Override + protected APIResult doExecute(String colo) throws FalconException { + return getEntityManager(colo).invoke("schedule", bufferedRequest, type, entity, + colo, skipDryRun, properties); + } + }.execute(); + } + + APIResult proxySuspend(final String type, final String entity, final String coloExpr, + final HttpServletRequest bufferedRequest) { + return new EntityProxy(type, entity) { + @Override + protected Set<String> getColosToApply() { + return getColosFromExpression(coloExpr, type, entity); + } + + @Override + protected APIResult doExecute(String colo) throws FalconException { + return getEntityManager(colo).invoke("suspend", bufferedRequest, type, entity, + colo); + } + }.execute(); + } + + APIResult proxyResume(final String type, final String entity, final String coloExpr, + final HttpServletRequest bufferedRequest) { + return new EntityProxy(type, entity) { + @Override + protected Set<String> getColosToApply() { + return getColosFromExpression(coloExpr, type, entity); + } + + @Override + protected APIResult doExecute(String colo) throws FalconException { + return getEntityManager(colo).invoke("resume", bufferedRequest, type, entity, + colo); + } + }.execute(); + } + Map<String, APIResult> proxyUpdate(final String type, final String entityName, final Boolean skipDryRun, final HttpServletRequest bufferedRequest, Entity newEntity) { final Set<String> oldColos = getApplicableColos(type, entityName); http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java index 6f75dc7..8733170 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java @@ -205,11 +205,11 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { Response.Status.NOT_FOUND); } - SortedMap<EntityType, List<Entity>> entityMap; + SortedMap<EntityType, List<String>> entityMap; try { entityMap = getJobEntities(extensionJobsBean); scheduleEntities(entityMap, request, coloExpr); - } catch (FalconException | IOException | JAXBException e) { + } catch (FalconException e) { LOG.error("Error while scheduling entities of the extension: " + jobName + ": ", e); throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); } @@ -221,27 +221,47 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) public APIResult suspend(@PathParam("job-name") String jobName, - @DefaultValue("") @QueryParam("doAs") String doAsUser) { + @Context HttpServletRequest request, + @DefaultValue("") @QueryParam("doAs") String doAsUser, + @QueryParam("colo") final String coloExpr) { checkIfExtensionServiceIsEnabled(); + ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); + ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName); + if (extensionJobsBean == null) { + // return failure if the extension job doesn't exist + LOG.error("Extension Job not found:" + jobName); + throw FalconWebException.newAPIException("ExtensionJob not found:" + jobName, + Response.Status.NOT_FOUND); + } + try { - List<Entity> entities = getEntityList("", "", "", TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser); - if (entities.isEmpty()) { - // return failure if the extension job doesn't exist - return new APIResult(APIResult.Status.FAILED, "Extension job " + jobName + " doesn't exist."); + SortedMap<EntityType, List<String>> entityNameMap = getJobEntities(extensionJobsBean); + suspendEntities(entityNameMap, coloExpr, request); + } catch (FalconException e) { + LOG.error("Error while suspending entities of the extension: " + jobName + ": ", e); + throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " suspended successfully"); + } + + private void suspendEntities(SortedMap<EntityType, List<String>> entityNameMap, String coloExpr, + final HttpServletRequest request) throws FalconException { + HttpServletRequest bufferedRequest = new BufferedRequest(request); + for (Map.Entry<EntityType, List<String>> entityTypeEntry : entityNameMap.entrySet()) { + for (final String entityName : entityTypeEntry.getValue()) { + entityProxyUtil.proxySuspend(entityTypeEntry.getKey().name(), entityName, coloExpr, bufferedRequest); } + } + } - for (Entity entity : entities) { - if (entity.getEntityType().isSchedulable()) { - if (getWorkflowEngine(entity).isActive(entity)) { - getWorkflowEngine(entity).suspend(entity); - } - } + private void resumeEntities(SortedMap<EntityType, List<String>> entityNameMap, String coloExpr, + final HttpServletRequest request) throws FalconException { + HttpServletRequest bufferedRequest = new BufferedRequest(request); + for (Map.Entry<EntityType, List<String>> entityTypeEntry : entityNameMap.entrySet()) { + for (final String entityName : entityTypeEntry.getValue()) { + entityProxyUtil.proxyResume(entityTypeEntry.getKey().name(), entityName, coloExpr, bufferedRequest); } - } catch (FalconException | IOException e) { - LOG.error("Error when scheduling extension job: " + jobName + ": ", e); - throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); } - return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " suspended successfully"); } @POST @@ -249,24 +269,23 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) public APIResult resume(@PathParam("job-name") String jobName, + @Context HttpServletRequest request, + @QueryParam("colo") final String coloExpr, @DefaultValue("") @QueryParam("doAs") String doAsUser) { checkIfExtensionServiceIsEnabled(); + ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); + ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName); + if (extensionJobsBean == null) { + // return failure if the extension job doesn't exist + LOG.error("Extension Job not found:" + jobName); + throw FalconWebException.newAPIException("ExtensionJob not found:" + jobName, + Response.Status.NOT_FOUND); + } try { - List<Entity> entities = getEntityList("", "", "", TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser); - if (entities.isEmpty()) { - // return failure if the extension job doesn't exist - return new APIResult(APIResult.Status.FAILED, "Extension job " + jobName + " doesn't exist."); - } - - for (Entity entity : entities) { - if (entity.getEntityType().isSchedulable()) { - if (getWorkflowEngine(entity).isSuspended(entity)) { - getWorkflowEngine(entity).resume(entity); - } - } - } - } catch (FalconException | IOException e) { - LOG.error("Error when resuming extension job " + jobName + ": ", e); + SortedMap<EntityType, List<String>> entityNameMap = getJobEntities(extensionJobsBean); + resumeEntities(entityNameMap, coloExpr, request); + } catch (FalconException e) { + LOG.error("Error while resuming entities of the extension: " + jobName + ": ", e); throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); } return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " resumed successfully"); @@ -288,11 +307,11 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { "Extension job " + jobName + " doesn't exist. Nothing to delete."); } - SortedMap<EntityType, List<Entity>> entityMap; + SortedMap<EntityType, List<String>> entityMap; try { entityMap = getJobEntities(extensionJobsBean); deleteEntities(entityMap, request); - } catch (FalconException | IOException | JAXBException e) { + } catch (FalconException e) { LOG.error("Error when deleting extension job: " + jobName + ": ", e); throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); } @@ -388,10 +407,13 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { checkIfExtensionIsEnabled(extensionName); checkIfExtensionJobExists(jobName, extensionName); SortedMap<EntityType, List<Entity>> entityMap; + SortedMap<EntityType, List<String>> entityNameMap; + ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); try { entityMap = getEntityList(extensionName, jobName, feedForms, processForms, config); submitEntities(extensionName, jobName, entityMap, config, request); - scheduleEntities(entityMap, request, coloExpr); + entityNameMap = getJobEntities(metaStore.getExtensionJobDetails(jobName)); + scheduleEntities(entityNameMap, request, coloExpr); } catch (FalconException | IOException | JAXBException e) { LOG.error("Error while submitting extension job: ", e); throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); @@ -399,27 +421,13 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted and scheduled successfully"); } - private void scheduleEntities(Map<EntityType, List<Entity>> entityMap, HttpServletRequest request, String coloExpr) - throws FalconException, JAXBException, IOException { - for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) { - for (final Entity entity : entry.getValue()) { - final HttpServletRequest httpServletRequest = getEntityStream(entity, entity.getEntityType(), request); - final HttpServletRequest bufferedRequest = getBufferedRequest(httpServletRequest); - final Set<String> colos = getColosFromExpression(coloExpr, entity.getEntityType().name(), entity); - - new EntityProxy(entity.getEntityType().toString(), entity.getName()) { - @Override - protected Set<String> getColosToApply() { - return colos; - } - - @Override - protected APIResult doExecute(String colo) throws FalconException { - return new EntityProxyUtil().getEntityManager(colo).invoke("schedule", bufferedRequest, - entity.getEntityType().toString(), - entity.getName(), colo, Boolean.FALSE, ""); - } - }.execute(); + private void scheduleEntities(SortedMap<EntityType, List<String>> entityMap, HttpServletRequest request, + String coloExpr) throws FalconException { + HttpServletRequest bufferedRequest = new BufferedRequest(request); + for (Map.Entry<EntityType, List<String>> entityTypeEntry : entityMap.entrySet()) { + for (final String entityName : entityTypeEntry.getValue()) { + entityProxyUtil.proxySchedule(entityTypeEntry.getKey().name(), entityName, coloExpr, + Boolean.FALSE, "", bufferedRequest); } } } @@ -431,16 +439,14 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { return new BufferedRequest(request); } - private void deleteEntities(SortedMap<EntityType, List<Entity>> entityMap, HttpServletRequest request) - throws IOException, JAXBException { - for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) { - for (final Entity entity : entry.getValue()) { - final HttpServletRequest bufferedRequest = getEntityStream(entity, entity.getEntityType(), request); - final String entityType = entity.getEntityType().toString(); - final String entityName = entity.getName(); - entityProxyUtil.proxyDelete(entityType, entityName, bufferedRequest); + private void deleteEntities(SortedMap<EntityType, List<String>> entityMap, HttpServletRequest request) + throws FalconException { + for (Map.Entry<EntityType, List<String>> entityTypeEntry : entityMap.entrySet()) { + for (final String entityName : entityTypeEntry.getValue()) { + HttpServletRequest bufferedRequest = new BufferedRequest(request); + entityProxyUtil.proxyDelete(entityTypeEntry.getKey().name(), entityName, bufferedRequest); if (!embeddedMode) { - super.delete(bufferedRequest, entityType, entityName, currentColo); + super.delete(bufferedRequest, entityTypeEntry.getKey().name(), entityName, currentColo); } } } http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java index 8f41c48..5b5d690 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java @@ -469,18 +469,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana @QueryParam("properties") final String properties) { final HttpServletRequest bufferedRequest = getBufferedRequest(request); - return new EntityProxy(type, entity) { - @Override - protected Set<String> getColosToApply() { - return getColosFromExpression(coloExpr, type, entity); - } - - @Override - protected APIResult doExecute(String colo) throws FalconException { - return entityProxyUtil.getEntityManager(colo).invoke("schedule", bufferedRequest, type, entity, - colo, skipDryRun, properties); - } - }.execute(); + return entityProxyUtil.proxySchedule(type, entity, coloExpr, skipDryRun, properties, bufferedRequest); } /** @@ -531,22 +520,11 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana @Dimension("colo") @QueryParam("colo") final String coloExpr) { final HttpServletRequest bufferedRequest = new BufferedRequest(request); - return new EntityProxy(type, entity) { - @Override - protected Set<String> getColosToApply() { - return getColosFromExpression(coloExpr, type, entity); - } - - @Override - protected APIResult doExecute(String colo) throws FalconException { - return entityProxyUtil.getEntityManager(colo).invoke("suspend", bufferedRequest, type, entity, - colo); - } - }.execute(); + return entityProxyUtil.proxySuspend(type, entity, coloExpr, bufferedRequest); } /** - * Resume a supended entity. + * Resume a suspended entity. * @param request Servlet Request * @param type Valid options are feed or process. * @param entity Name of the entity. @@ -564,18 +542,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana @Dimension("colo") @QueryParam("colo") final String coloExpr) { final HttpServletRequest bufferedRequest = new BufferedRequest(request); - return new EntityProxy(type, entity) { - @Override - protected Set<String> getColosToApply() { - return getColosFromExpression(coloExpr, type, entity); - } - - @Override - protected APIResult doExecute(String colo) throws FalconException { - return entityProxyUtil.getEntityManager(colo).invoke("resume", bufferedRequest, type, entity, - colo); - } - }.execute(); + return entityProxyUtil.proxyResume(type, entity, coloExpr, bufferedRequest); } //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java index 6a65d2c..d76dbca 100644 --- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java @@ -77,14 +77,14 @@ public class FalconUnitClient extends AbstractFalconClient { private static final String DEFAULT_ORDER_BY = "status"; private static final String DEFAULT_SORTED_ORDER = "asc"; - protected ConfigurationStore configStore; + private ConfigurationStore configStore; private AbstractWorkflowEngine workflowEngine; private LocalSchedulableEntityManager localSchedulableEntityManager; private LocalInstanceManager localInstanceManager; private LocalExtensionManager localExtensionManager; - public FalconUnitClient() throws FalconException { + FalconUnitClient() throws FalconException { configStore = ConfigurationStore.get(); workflowEngine = WorkflowEngineFactory.getWorkflowEngine(); localSchedulableEntityManager = new LocalSchedulableEntityManager(); @@ -123,7 +123,6 @@ public class FalconUnitClient extends AbstractFalconClient { * @param entityName entity name * @param cluster cluster on which it has to be scheduled * @return - * @throws FalconException */ @Override public APIResult schedule(EntityType entityType, String entityName, String cluster, @@ -377,6 +376,23 @@ public class FalconUnitClient extends AbstractFalconClient { } } + @Override + public APIResult suspendExtensionJob(String jobName, String coloExpr, String doAsUser) { + try { + return localExtensionManager.suspendExtensionJob(jobName, coloExpr, doAsUser); + } catch (FalconException e) { + throw new FalconCLIException("Failed in suspending the extension job:" + jobName); + } + } + + @Override + public APIResult resumeExtensionJob(String jobName, String coloExpr, String doAsUser) { + try { + return localExtensionManager.resumeExtensionJob(jobName, coloExpr, doAsUser); + } catch (FalconException e) { + throw new FalconCLIException("Failed in resuming the extension job:" + jobName); + } + } @Override public APIResult getExtensionJobDetails(final String jobName) { http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java index 20ccfca..a32dbfa 100644 --- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java +++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java @@ -39,7 +39,7 @@ import java.util.SortedMap; /** * A proxy implementation of the extension operations in local mode. */ -public class LocalExtensionManager extends AbstractExtensionManager { +class LocalExtensionManager extends AbstractExtensionManager { LocalExtensionManager() {} APIResult submitExtensionJob(String extensionName, String jobName, InputStream configStream, @@ -99,10 +99,10 @@ public class LocalExtensionManager extends AbstractExtensionManager { throws FalconException, IOException{ ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName); - SortedMap<EntityType, List<Entity>> entityMap = getJobEntities(extensionJobsBean); - for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) { - for (Entity entity : entry.getValue()) { - scheduleInternal(entity.getEntityType().name(), entity.getName(), true, null); + SortedMap<EntityType, List<String>> entityMap = getJobEntities(extensionJobsBean); + for (Map.Entry<EntityType, List<String>> entry : entityMap.entrySet()) { + for (String entityName : entry.getValue()) { + scheduleInternal(entry.getKey().name(), entityName, true, null); } } return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " scheduled successfully"); @@ -111,10 +111,10 @@ public class LocalExtensionManager extends AbstractExtensionManager { APIResult deleteExtensionJob(String jobName) throws FalconException, IOException { ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName); - SortedMap<EntityType, List<Entity>> entityMap = getJobEntities(extensionJobsBean); - for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) { - for (Entity entity : entry.getValue()) { - delete(entity.getEntityType().name(), entity.getName(), null); + SortedMap<EntityType, List<String>> entityMap = getJobEntities(extensionJobsBean); + for (Map.Entry<EntityType, List<String>> entry : entityMap.entrySet()) { + for (String entityName : entry.getValue()) { + delete(entry.getKey().name(), entityName, null); } } ExtensionStore.getMetaStore().deleteExtensionJob(jobName); @@ -148,6 +148,30 @@ public class LocalExtensionManager extends AbstractExtensionManager { return new APIResult(APIResult.Status.SUCCEEDED, "Updated successfully"); } + APIResult suspendExtensionJob(String jobName, String coloExpr, String doAsUser) throws FalconException { + ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); + ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName); + SortedMap<EntityType, List<String>> entityMap = getJobEntities(extensionJobsBean); + for (Map.Entry<EntityType, List<String>> entityTypeEntry : entityMap.entrySet()) { + for (String entityName : entityTypeEntry.getValue()) { + super.suspend(null, entityTypeEntry.getKey().name(), entityName, coloExpr); + } + } + return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " suspended successfully"); + } + + APIResult resumeExtensionJob(String jobName, String coloExpr, String doAsUser) throws FalconException { + ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); + ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName); + SortedMap<EntityType, List<String>> entityMap = getJobEntities(extensionJobsBean); + for (Map.Entry<EntityType, List<String>> entityTypeEntry : entityMap.entrySet()) { + for (String entityName : entityTypeEntry.getValue()) { + super.resume(null, entityTypeEntry.getKey().name(), entityName, coloExpr); + } + } + return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " suspended successfully"); + } + APIResult registerExtensionMetadata(String extensionName, String packagePath, String description) { return super.registerExtensionMetadata(extensionName, packagePath, description, CurrentUser.getUser()); } http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java index 508a7bb..5717fc2 100644 --- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java +++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java @@ -438,6 +438,8 @@ public class TestFalconUnit extends FalconUnitTestBase { clearDB(); submitCluster(); createExtensionPackage(); + createDir(PROCESS_APP_PATH); + fs.copyFromLocalFile(new Path(getAbsolutePath(WORKFLOW)), new Path(PROCESS_APP_PATH, "workflow.xml")); String packageBuildLib = new Path(EXTENSION_PATH, "libs/build/").toString(); String result = registerExtension(TEST_EXTENSION, STORAGE_URL + EXTENSION_PATH, TEST_EXTENSION); Assert.assertEquals(result, "Extension :testExtension registered successfully."); @@ -454,6 +456,14 @@ public class TestFalconUnit extends FalconUnitTestBase { apiResult = getClient().scheduleExtensionJob(TEST_JOB, null, null); assertStatus(apiResult); + + apiResult = getClient().suspendExtensionJob(TEST_JOB, null, null); + assertStatus(apiResult); + apiResult = getClient().getStatus(EntityType.PROCESS, "sample", CLUSTER_NAME, null, false); + Assert.assertEquals(apiResult.getMessage(), "SUSPENDED"); + + apiResult = getClient().resumeExtensionJob(TEST_JOB, null, null); + assertStatus(apiResult); apiResult = getClient().getStatus(EntityType.PROCESS, "sample", CLUSTER_NAME, null, false); assertStatus(apiResult); Assert.assertEquals(apiResult.getMessage(), "RUNNING"); @@ -487,8 +497,13 @@ public class TestFalconUnit extends FalconUnitTestBase { } } + @Test + public void testExtensionJobSuspendAndResume() throws Exception { + + } + - void copyExtensionJar(String destDirPath) throws IOException { + private void copyExtensionJar(String destDirPath) throws IOException { File dir = new File(new Path(JARS_DIR).toUri().toURL().getPath()); for (File file : dir.listFiles()) { if (file.toString().endsWith(".jar")) { http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java b/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java index 3a6c9c0..ac05b0f 100644 --- a/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java +++ b/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java @@ -130,6 +130,28 @@ public class ExtensionManager extends AbstractExtensionManager { + "on Prism."); } + @POST + @Path("suspend/{job-name}") + @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) + @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) + public APIResult suspend(@PathParam("job-name") String jobName, + @DefaultValue("") @QueryParam("doAs") String doAsUser) { + LOG.error("Suspend of an extension job is not supported on Server.Please run your operation on Prism "); + throw FalconWebException.newAPIException("Suspend of an extension job is not supported on Server." + + "Please run your operation on Prism."); + } + + @POST + @Path("resume/{job-name}") + @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) + @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) + public APIResult resume(@PathParam("job-name") String jobName, + @DefaultValue("") @QueryParam("doAs") String doAsUser) { + LOG.error("Resume of an extension job is not supported on Server.Please run your operation on Prism "); + throw FalconWebException.newAPIException("Resume of an extension job is not supported on Server." + + "Please run your operation on Prism."); + } + @GET @Path("definition/{extension-name}") @Produces({MediaType.APPLICATION_JSON})