Repository: falcon Updated Branches: refs/heads/master 8054727ce -> 7c5822cf5
FALCON-2231 Changes to support Schedule of user extensions Author: sandeep <[email protected]> Reviewers: @pallavi,@pracheer,@praveen Closes #334 from sandeepSamudrala/FALCON-2231 and squashes the following commits: d32bf98 [sandeep] FALCON-2231 Fixed checkstyle issues. 2bbd7e2 [sandeep] FALCON-2231 Incoporated review comments and fixed test cases 2269806 [sandeep] FALCON-2231 Incoporated review comments and small fixes for duplicate submission and colo addition to schedule command 44d6f2a [sandeep] FALCON-2231 Corrected message in LocalExtensionManager f165282 [sandeep] FALCON-2231 Updated Error messages and throwing out exception in case of extension not found while scheduling 96a9a1d [sandeep] FALCON-2231 Rebased my patch ca320e0 [sandeep] FACLON-2231 Changes to support Schedule of user extensions 53831ea [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2231 cc28658 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 089b10d [sandeep] Merge branch 'master' of https://github.com/apache/falcon 456d4ee [sandeep] Merge branch 'master' of https://github.com/apache/falcon 0cf9af6 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 4a2e23e [sandeep] Merge branch 'master' of https://github.com/apache/falcon b1546ed [sandeep] Merge branch 'master' of https://github.com/apache/falcon 0a433fb [sandeep] Merge branch 'master' of https://github.com/apache/falcon 194f36a [sandeep] Merge branch 'master' of https://github.com/apache/falcon e0ad358 [sandeep] Merge branch 'master' of https://github.com/apache/falcon f96a084 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 9cf36e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon bbca081 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 48f6afa [sandeep] Merge branch 'master' of https://github.com/apache/falcon 250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon 1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon c065566 [sandeep] reverting last line changes made 1a4dcd2 [sandeep] rebased and resolved the conflicts from master 271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay. a94d4fe [sandeep] rebasing from master 9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7c5822cf Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7c5822cf Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7c5822cf Branch: refs/heads/master Commit: 7c5822cf50d07cb48ae83f3ebc761926b3bc5863 Parents: 8054727 Author: sandeep <[email protected]> Authored: Tue Jan 3 21:16:26 2017 +0530 Committer: Praveen Adlakha <[email protected]> Committed: Tue Jan 3 21:16:26 2017 +0530 ---------------------------------------------------------------------- .../org/apache/falcon/cli/FalconEntityCLI.java | 2 +- .../apache/falcon/cli/FalconExtensionCLI.java | 48 ++++++++----- .../falcon/client/AbstractFalconClient.java | 7 ++ .../org/apache/falcon/client/FalconClient.java | 3 +- .../extensions/jdbc/ExtensionMetaStore.java | 12 +++- .../extensions/jdbc/ExtensionMetaStoreTest.java | 2 + .../falcon/resource/AbstractEntityManager.java | 13 +++- .../resource/proxy/ExtensionManagerProxy.java | 44 +++++++----- .../apache/falcon/unit/FalconUnitClient.java | 9 +++ .../falcon/unit/LocalExtensionManager.java | 72 +++++++++++++------- .../org/apache/falcon/unit/TestFalconUnit.java | 4 +- .../falcon/resource/ExtensionManager.java | 72 +++++++++++++++++--- 12 files changed, 213 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/7c5822cf/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java ---------------------------------------------------------------------- diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java index a8aea52..dcac8e8 100644 --- a/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java +++ b/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java @@ -369,7 +369,7 @@ public class FalconEntityCLI extends FalconCLI { OUT.get().println(result); } - private void validateColo(Set<String> optionsList) { + static void validateColo(Set<String> optionsList) { if (optionsList.contains(FalconCLIConstants.COLO_OPT)) { throw new FalconCLIException("Invalid argument : " + FalconCLIConstants.COLO_OPT); } http://git-wip-us.apache.org/repos/asf/falcon/blob/7c5822cf/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java ---------------------------------------------------------------------- diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java index 0343aa8..2a105dc 100644 --- a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java +++ b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java @@ -33,38 +33,44 @@ import org.apache.falcon.client.FalconClient; import org.apache.falcon.resource.ExtensionInstanceList; import org.apache.falcon.resource.ExtensionJobList; +import java.io.IOException; import java.io.PrintStream; import java.util.HashSet; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.falcon.cli.FalconEntityCLI.validateColo; +import static org.apache.falcon.client.FalconCLIConstants.COLO_OPT; +import static org.apache.falcon.client.FalconCLIConstants.COLO_OPT_DESCRIPTION; + /** * Falcon extensions Command Line Interface - wraps the RESTful API for extensions. */ -public class FalconExtensionCLI { +public class FalconExtensionCLI extends FalconCLI{ public static final AtomicReference<PrintStream> OUT = new AtomicReference<>(System.out); // Extension commands - public static final String ENUMERATE_OPT = "enumerate"; - public static final String DEFINITION_OPT = "definition"; - public static final String DESCRIBE_OPT = "describe"; - public static final String INSTANCES_OPT = "instances"; - public static final String UNREGISTER_OPT = "unregister"; - public static final String DETAIL_OPT = "detail"; - public static final String REGISTER_OPT = "register"; - public static final String ENABLE_OPT = "enable"; - public static final String DISABLE_OPT = "disable"; + private static final String ENUMERATE_OPT = "enumerate"; + private static final String DEFINITION_OPT = "definition"; + private static final String DESCRIBE_OPT = "describe"; + private static final String INSTANCES_OPT = "instances"; + private static final String UNREGISTER_OPT = "unregister"; + private static final String DETAIL_OPT = "detail"; + private static final String REGISTER_OPT = "register"; + private static final String ENABLE_OPT = "enable"; + private static final String DISABLE_OPT = "disable"; // Input parameters - public static final String EXTENSION_NAME_OPT = "extensionName"; - public static final String JOB_NAME_OPT = "jobName"; + private static final String EXTENSION_NAME_OPT = "extensionName"; + private static final String JOB_NAME_OPT = "jobName"; public static final String DESCRIPTION = "description"; - public static final String PATH = "path"; + private static final String PATH = "path"; - public FalconExtensionCLI() { + FalconExtensionCLI() throws Exception { + super(); } - public void extensionCommand(CommandLine commandLine, FalconClient client) { + void extensionCommand(CommandLine commandLine, FalconClient client) throws IOException { Set<String> optionsList = new HashSet<>(); for (Option option : commandLine.getOptions()) { optionsList.add(option.getOpt()); @@ -77,6 +83,8 @@ public class FalconExtensionCLI { String doAsUser = commandLine.getOptionValue(FalconCLIConstants.DO_AS_OPT); String path = commandLine.getOptionValue(FalconCLIConstants.PATH); String description = commandLine.getOptionValue(FalconCLIConstants.DESCRIPTION); + String colo = commandLine.getOptionValue(FalconCLIConstants.COLO_OPT); + colo = getColo(colo); if (optionsList.contains(ENUMERATE_OPT)) { result = client.enumerateExtensions().getMessage(); @@ -105,6 +113,7 @@ public class FalconExtensionCLI { validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); validateRequiredParameter(jobName, JOB_NAME_OPT); validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT); + validateColo(optionsList); result = client.submitExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage(); } else if (optionsList.contains(REGISTER_OPT)) { validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); @@ -114,6 +123,7 @@ public class FalconExtensionCLI { validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); validateRequiredParameter(jobName, JOB_NAME_OPT); validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT); + validateColo(optionsList); result = client.submitAndScheduleExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage(); } else if (optionsList.contains(FalconCLIConstants.UPDATE_OPT)) { validateRequiredParameter(jobName, JOB_NAME_OPT); @@ -125,7 +135,8 @@ public class FalconExtensionCLI { result = client.validateExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage(); } else if (optionsList.contains(FalconCLIConstants.SCHEDULE_OPT)) { validateRequiredParameter(jobName, JOB_NAME_OPT); - result = client.scheduleExtensionJob(jobName, doAsUser).getMessage(); + colo = getColo(colo); + result = client.scheduleExtensionJob(jobName, colo, doAsUser).getMessage(); } else if (optionsList.contains(FalconCLIConstants.SUSPEND_OPT)) { validateRequiredParameter(jobName, JOB_NAME_OPT); result = client.suspendExtensionJob(jobName, doAsUser).getMessage(); @@ -170,7 +181,7 @@ public class FalconExtensionCLI { OUT.get().println(result); } - public Options createExtensionOptions() { + Options createExtensionOptions() { Options extensionOptions = new Options(); Option enumerate = new Option(ENUMERATE_OPT, false, "Enumerate all extensions"); @@ -192,6 +203,8 @@ public class FalconExtensionCLI { Option detail = new Option(FalconCLIConstants.DETAIL, false, "Show details of a given extension"); Option register = new Option(FalconCLIConstants.REGISTER, false, "Register an extension with Falcon. This will " + "make the extension available for instantiation for all users."); + Option colo = new Option(COLO_OPT, true, COLO_OPT_DESCRIPTION); + colo.setRequired(false); OptionGroup group = new OptionGroup(); group.addOption(enumerate); @@ -249,6 +262,7 @@ public class FalconExtensionCLI { extensionOptions.addOption(filePath); extensionOptions.addOption(path); extensionOptions.addOption(description); + extensionOptions.addOption(colo); return extensionOptions; } http://git-wip-us.apache.org/repos/asf/falcon/blob/7c5822cf/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java index 3181b64..7b8a606 100644 --- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java @@ -222,6 +222,13 @@ public abstract class AbstractFalconClient { String doAsUser); /** + * Schedules the set of entities that are part of the extension. + * @param jobName extensionJob that needs to be scheduled. + * @return APIResult stating status of scheduling the extension. + */ + public abstract APIResult scheduleExtensionJob(String jobName, String coloExpr, String doAsUser); + + /** * Prepares set of entities the extension has implemented and stage them to a local directory and submits and * schedules them. * @param extensionName extension which is available in the store. http://git-wip-us.apache.org/repos/asf/falcon/blob/7c5822cf/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index 8401c9c..2772085 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -1200,9 +1200,10 @@ public class FalconClient extends AbstractFalconClient { } } - public APIResult scheduleExtensionJob(final String jobName, final String doAsUser) { + public APIResult scheduleExtensionJob(String jobName, final String coloExpr, final String doAsUser) { ClientResponse clientResponse = new ResourceBuilder() .path(ExtensionOperations.SCHEDULE.path, jobName) + .addQueryParam(COLO, coloExpr) .addQueryParam(DO_AS_OPT, doAsUser) .call(ExtensionOperations.SCHEDULE); return getResponse(APIResult.class, clientResponse); http://git-wip-us.apache.org/repos/asf/falcon/blob/7c5822cf/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java index e53069a..9126b67 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java @@ -19,6 +19,7 @@ package org.apache.falcon.extensions.jdbc; import org.apache.falcon.extensions.ExtensionStatus; import org.apache.falcon.extensions.ExtensionType; +import org.apache.falcon.extensions.store.ExtensionStore; import org.apache.falcon.persistence.ExtensionBean; import org.apache.falcon.persistence.ExtensionJobsBean; import org.apache.falcon.persistence.PersistenceConstants; @@ -145,6 +146,11 @@ public class ExtensionMetaStore { public void storeExtensionJob(String jobName, String extensionName, List<String> feeds, List<String> processes, byte[] config) { + ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); + boolean alreadySubmitted = false; + if (metaStore.getExtensionJobDetails(jobName) != null){ + alreadySubmitted = true; + } ExtensionJobsBean extensionJobsBean = new ExtensionJobsBean(); Date currentTime = new Date(System.currentTimeMillis()); extensionJobsBean.setJobName(jobName); @@ -157,7 +163,11 @@ public class ExtensionMetaStore { EntityManager entityManager = getEntityManager(); try { beginTransaction(entityManager); - entityManager.persist(extensionJobsBean); + if (alreadySubmitted) { + entityManager.merge(extensionJobsBean); + } else { + entityManager.persist(extensionJobsBean); + } } finally { commitAndCloseTransaction(entityManager); } http://git-wip-us.apache.org/repos/asf/falcon/blob/7c5822cf/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java ---------------------------------------------------------------------- diff --git a/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java b/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java index 1688abb..e3327e8 100644 --- a/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java +++ b/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java @@ -83,6 +83,8 @@ public class ExtensionMetaStoreTest extends AbstractTestExtensionStore { byte[] config = new byte[0]; stateStore.storeExtensionJob("job1", "test2", feeds, processes, config); + //storing again to check for entity manager merge to let submission go forward. + stateStore.storeExtensionJob("job1", "test2", feeds, processes, config); Assert.assertEquals(stateStore.getAllExtensionJobs().size(), 1); Assert.assertEquals(stateStore.getExtensionJobDetails("job1").getFeeds().get(0), "testFeed"); http://git-wip-us.apache.org/repos/asf/falcon/blob/7c5822cf/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java index 81b0448..8daf8c7 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java @@ -128,12 +128,21 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource { } protected Set<String> getColosFromExpression(String coloExpr, String type, String entity) { - Set<String> colos; final Set<String> applicableColos = getApplicableColos(type, entity); + return getColosFromExpression(coloExpr, applicableColos); + } + + protected Set<String> getColosFromExpression(String coloExpr, String type, Entity entity) { + final Set<String> applicableColos = getApplicableColos(type, entity); + return getColosFromExpression(coloExpr, applicableColos); + } + + private Set<String> getColosFromExpression(String coloExpr, Set<String> applicableColos) { + Set<String> colos; if (coloExpr == null || coloExpr.equals("*") || coloExpr.isEmpty()) { colos = applicableColos; } else { - colos = new HashSet<String>(Arrays.asList(coloExpr.split(","))); + colos = new HashSet<>(Arrays.asList(coloExpr.split(","))); if (!applicableColos.containsAll(colos)) { throw FalconWebException.newAPIException("Given colos not applicable for entity operation"); } http://git-wip-us.apache.org/repos/asf/falcon/blob/7c5822cf/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java index 2b5cbe7..6f75dc7 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java @@ -191,21 +191,26 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) public APIResult schedule(@PathParam("job-name") String jobName, + @Context HttpServletRequest request, + @QueryParam("colo") final String coloExpr, @DefaultValue("") @QueryParam("doAs") String doAsUser) { checkIfExtensionServiceIsEnabled(); checkIfExtensionIsEnabled(ExtensionStore.getMetaStore().getExtensionJobDetails(jobName).getExtensionName()); - try { - List<Entity> entities = getEntityList("", "", "", TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser); - if (entities.isEmpty()) { - // return failure if the extension job doesn't exist - return new APIResult(APIResult.Status.FAILED, "Extension job " + jobName + " doesn't exist."); - } + ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); + ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName); + if (extensionJobsBean == null) { + // return failure if the extension job doesn't exist + LOG.error("Extension Job not found:" + jobName); + throw FalconWebException.newAPIException("ExtensionJob not found:" + jobName, + Response.Status.NOT_FOUND); + } - for (Entity entity : entities) { - scheduleInternal(entity.getEntityType().name(), entity.getName(), null, null); - } - } catch (FalconException | IOException e) { - LOG.error("Error when scheduling extension job: " + jobName + ": ", e); + SortedMap<EntityType, List<Entity>> entityMap; + try { + entityMap = getJobEntities(extensionJobsBean); + scheduleEntities(entityMap, request, coloExpr); + } catch (FalconException | IOException | JAXBException e) { + LOG.error("Error while scheduling entities of the extension: " + jobName + ": ", e); throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); } return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " scheduled successfully"); @@ -375,6 +380,7 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { @Context HttpServletRequest request, @DefaultValue("") @QueryParam("doAs") String doAsUser, @QueryParam("jobName") String jobName, + @QueryParam("colo") final String coloExpr, @FormDataParam("processes") List<FormDataBodyPart> processForms, @FormDataParam("feeds") List<FormDataBodyPart> feedForms, @FormDataParam("config") InputStream config) { @@ -385,7 +391,7 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { try { entityMap = getEntityList(extensionName, jobName, feedForms, processForms, config); submitEntities(extensionName, jobName, entityMap, config, request); - scheduleEntities(entityMap, request); + scheduleEntities(entityMap, request, coloExpr); } catch (FalconException | IOException | JAXBException e) { LOG.error("Error while submitting extension job: ", e); throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); @@ -393,13 +399,13 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted and scheduled successfully"); } - protected void scheduleEntities(Map<EntityType, List<Entity>> entityMap, HttpServletRequest request) + private void scheduleEntities(Map<EntityType, List<Entity>> entityMap, HttpServletRequest request, String coloExpr) throws FalconException, JAXBException, IOException { for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) { for (final Entity entity : entry.getValue()) { final HttpServletRequest httpServletRequest = getEntityStream(entity, entity.getEntityType(), request); final HttpServletRequest bufferedRequest = getBufferedRequest(httpServletRequest); - final Set<String> colos = getApplicableColos(entity.getEntityType().toString(), entity); + final Set<String> colos = getColosFromExpression(coloExpr, entity.getEntityType().name(), entity); new EntityProxy(entity.getEntityType().toString(), entity.getName()) { @Override @@ -441,8 +447,9 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { } private void submitEntities(String extensionName, String jobName, - SortedMap<EntityType, List<Entity>> entityMap, InputStream configStream, - HttpServletRequest request) throws FalconException, IOException, JAXBException { + SortedMap<EntityType, List<Entity>> entityMap, InputStream configStream, + HttpServletRequest request) + throws FalconException, IOException, JAXBException { List<Entity> feeds = entityMap.get(EntityType.FEED); List<Entity> processes = entityMap.get(EntityType.PROCESS); validateFeeds(feeds); @@ -766,6 +773,7 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { private static void checkIfExtensionServiceIsEnabled() { if (!Services.get().isRegistered(ExtensionService.SERVICE_NAME)) { + LOG.error(ExtensionService.SERVICE_NAME + " is not enabled."); throw FalconWebException.newAPIException( ExtensionService.SERVICE_NAME + " is not enabled.", Response.Status.NOT_FOUND); } @@ -773,7 +781,8 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { private static void checkIfExtensionIsEnabled(String extensionName) { ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); - if (metaStore.getDetail(extensionName).getStatus().equals(ExtensionStatus.ENABLED)) { + if (!metaStore.getDetail(extensionName).getStatus().equals(ExtensionStatus.ENABLED)) { + LOG.error("Extension: " + extensionName + " is in disabled state."); throw FalconWebException.newAPIException("Extension: " + extensionName + " is in disabled state.", Response.Status.INTERNAL_SERVER_ERROR); } @@ -783,6 +792,7 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName); if (extensionJobsBean != null && !extensionJobsBean.getExtensionName().equals(extensionName)) { + LOG.error("Extension job with name: " + extensionName + " already exists."); throw FalconWebException.newAPIException("Extension job with name: " + extensionName + " already exists.", Response.Status.INTERNAL_SERVER_ERROR); } http://git-wip-us.apache.org/repos/asf/falcon/blob/7c5822cf/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java index 2a40611..6a65d2c 100644 --- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java @@ -334,6 +334,15 @@ public class FalconUnitClient extends AbstractFalconClient { } @Override + public APIResult scheduleExtensionJob(String jobName, String coloExpr, String doAsUser) { + try { + return localExtensionManager.scheduleExtensionJob(jobName, coloExpr, doAsUser); + } catch (FalconException | IOException e) { + throw new FalconCLIException("Failed to delete the extension job:" + coloExpr); + } + } + + @Override public APIResult submitAndScheduleExtensionJob(String extensionName, String jobName, String configPath, String doAsUser) { InputStream configStream = getServletInputStream(configPath); http://git-wip-us.apache.org/repos/asf/falcon/blob/7c5822cf/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java index 1e9b15a..20ccfca 100644 --- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java +++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java @@ -40,55 +40,75 @@ import java.util.SortedMap; * A proxy implementation of the extension operations in local mode. */ public class LocalExtensionManager extends AbstractExtensionManager { - public LocalExtensionManager() {} + LocalExtensionManager() {} - public APIResult submitExtensionJob(String extensionName, String jobName, InputStream config, - SortedMap<EntityType, List<Entity>> entityMap) + APIResult submitExtensionJob(String extensionName, String jobName, InputStream configStream, + SortedMap<EntityType, List<Entity>> entityMap) throws FalconException, IOException { - - for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){ - for(Entity entity : entry.getValue()){ + for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) { + for (Entity entity : entry.getValue()) { submitInternal(entity, "falconUser"); } } + storeExtension(extensionName, jobName, configStream, entityMap); + return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully" + jobName); } - public APIResult submitAndSchedulableExtensionJob(String extensionName, String jobName, InputStream configStream, - SortedMap<EntityType, List<Entity>> entityMap) + APIResult submitAndSchedulableExtensionJob(String extensionName, String jobName, InputStream configStream, + SortedMap<EntityType, List<Entity>> entityMap) throws FalconException, IOException { - List<String> feedNames = new ArrayList<>(); - List<String> processNames = new ArrayList<>(); for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){ for(Entity entity : entry.getValue()){ submitInternal(entity, "falconUser"); } } - for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){ - for(Entity entity : entry.getValue()){ + storeExtension(extensionName, jobName, configStream, entityMap); + + for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) { + for (Entity entity : entry.getValue()) { scheduleInternal(entry.getKey().name(), entity.getName(), null, null); } } + return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted and scheduled successfully" + + jobName); + } + + private void storeExtension(String extensionName, String jobName, InputStream configStream, SortedMap<EntityType, + List<Entity>> entityMap) throws IOException { byte[] configBytes = null; if (configStream != null) { configBytes = IOUtils.toByteArray(configStream); } - for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){ - for(final Entity entity : entry.getValue()){ - if (entity.getEntityType().equals(EntityType.FEED)){ + List<String> feedNames = new ArrayList<>(); + List<String> processNames = new ArrayList<>(); + for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) { + for (final Entity entity : entry.getValue()) { + if (entity.getEntityType().equals(EntityType.FEED)) { feedNames.add(entity.getName()); - }else{ + } else { processNames.add(entity.getName()); } } } ExtensionStore.getMetaStore().storeExtensionJob(jobName, extensionName, feedNames, processNames, configBytes); + } - return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully" + jobName); + APIResult scheduleExtensionJob(String jobName, String coloExpr, String doAsUser) + throws FalconException, IOException{ + ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); + ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName); + SortedMap<EntityType, List<Entity>> entityMap = getJobEntities(extensionJobsBean); + for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) { + for (Entity entity : entry.getValue()) { + scheduleInternal(entity.getEntityType().name(), entity.getName(), true, null); + } + } + return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " scheduled successfully"); } - public APIResult deleteExtensionJob(String jobName) throws FalconException, IOException{ + APIResult deleteExtensionJob(String jobName) throws FalconException, IOException { ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName); SortedMap<EntityType, List<Entity>> entityMap = getJobEntities(extensionJobsBean); @@ -101,8 +121,8 @@ public class LocalExtensionManager extends AbstractExtensionManager { return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " deleted successfully"); } - public APIResult updateExtensionJob(String extensionName, String jobName, InputStream configStream, - SortedMap<EntityType, List<Entity>> entityMap) + APIResult updateExtensionJob(String extensionName, String jobName, InputStream configStream, + SortedMap<EntityType, List<Entity>> entityMap) throws FalconException, IOException { List<String> feedNames = new ArrayList<>(); List<String> processNames = new ArrayList<>(); @@ -128,27 +148,27 @@ public class LocalExtensionManager extends AbstractExtensionManager { return new APIResult(APIResult.Status.SUCCEEDED, "Updated successfully"); } - public APIResult registerExtensionMetadata(String extensionName, String packagePath , String description) { + APIResult registerExtensionMetadata(String extensionName, String packagePath, String description) { return super.registerExtensionMetadata(extensionName, packagePath, description, CurrentUser.getUser()); } - public APIResult unRegisterExtension(String extensionName) { + APIResult unRegisterExtension(String extensionName) { return super.deleteExtensionMetadata(extensionName); } - public APIResult getExtensionJobDetails(String jobName){ + APIResult getExtensionJobDetails(String jobName){ return super.getExtensionJobDetail(jobName); } - public APIResult disableExtension(String extensionName) { + APIResult disableExtension(String extensionName) { return new APIResult(APIResult.Status.SUCCEEDED, super.disableExtension(extensionName, CurrentUser.getUser())); } - public APIResult enableExtension(String extensionName) { + APIResult enableExtension(String extensionName) { return new APIResult(APIResult.Status.SUCCEEDED, super.disableExtension(extensionName, CurrentUser.getUser())); } - public APIResult getExtensionDetails(String extensionName){ + APIResult getExtensionDetails(String extensionName){ return super.getExtensionDetail(extensionName); } http://git-wip-us.apache.org/repos/asf/falcon/blob/7c5822cf/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java index a41743d..508a7bb 100644 --- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java +++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java @@ -444,7 +444,7 @@ public class TestFalconUnit extends FalconUnitTestBase { createDir(PROCESS_APP_PATH); copyExtensionJar(packageBuildLib); - APIResult apiResult = submitAndScheduleExtensionJob(TEST_EXTENSION, TEST_JOB, null, null); + APIResult apiResult = submitExtensionJob(TEST_EXTENSION, TEST_JOB, null, null); assertStatus(apiResult); result = getExtensionJobDetails(TEST_JOB); JSONObject resultJson = new JSONObject(result); @@ -452,6 +452,8 @@ public class TestFalconUnit extends FalconUnitTestBase { Process process = (Process) getClient().getDefinition(EntityType.PROCESS.toString(), "sample", null); Assert.assertEquals(process.getPipelines(), "testPipeline"); + apiResult = getClient().scheduleExtensionJob(TEST_JOB, null, null); + assertStatus(apiResult); apiResult = getClient().getStatus(EntityType.PROCESS, "sample", CLUSTER_NAME, null, false); assertStatus(apiResult); Assert.assertEquals(apiResult.getMessage(), "RUNNING"); http://git-wip-us.apache.org/repos/asf/falcon/blob/7c5822cf/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java b/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java index 2160320..3a6c9c0 100644 --- a/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java +++ b/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java @@ -17,39 +17,93 @@ */ package org.apache.falcon.resource; +import com.sun.jersey.multipart.FormDataBodyPart; +import com.sun.jersey.multipart.FormDataParam; import org.apache.falcon.FalconWebException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.POST; import javax.ws.rs.Consumes; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; +import java.io.InputStream; +import java.util.List; /** * This class provides RESTful API for the extensions. */ @Path("extension") -public class ExtensionManager { - public static final Logger LOG = LoggerFactory.getLogger(ExtensionManager.class); +public class ExtensionManager extends AbstractExtensionManager { + private static final Logger LOG = LoggerFactory.getLogger(ExtensionManager.class); @GET @Path("enumerate") @Produces({MediaType.APPLICATION_JSON}) - public Response getExtensions() { + public APIResult getExtensions() { LOG.error("Enumerate is not supported on Server.Please run your operation on Prism "); throw FalconWebException.newAPIException("Enumerate is not supported on Server. Please run your operation " + "on Prism."); } + @POST + @Path("schedule/{job-name}") + @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) + @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) + public APIResult schedule(@PathParam("job-name") String jobName, + @Context HttpServletRequest request, + @DefaultValue("") @QueryParam("doAs") String doAsUser) { + LOG.error("schedule is not supported on Server.Please run your operation on Prism "); + throw FalconWebException.newAPIException("schedule is not supported on Server. Please run your operation " + + "on Prism."); + } + + @POST + @Path("submit/{extension-name}") + @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.MULTIPART_FORM_DATA, + MediaType.APPLICATION_OCTET_STREAM}) + @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) + public APIResult submit( + @PathParam("extension-name") String extensionName, + @Context HttpServletRequest request, + @DefaultValue("") @QueryParam("doAs") String doAsUser, + @QueryParam("jobName") String jobName, + @FormDataParam("processes") List<FormDataBodyPart> processForms, + @FormDataParam("feeds") List<FormDataBodyPart> feedForms, + @FormDataParam("config") InputStream config) { + LOG.error("submit is not supported on Server.Please run your operation on Prism "); + throw FalconWebException.newAPIException("submit is not supported on Server. Please run your operation " + + "on Prism."); + } + + @POST + @Path("submitAndSchedule/{extension-name}") + @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.MULTIPART_FORM_DATA}) + @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) + public APIResult submitAndSchedule( + @PathParam("extension-name") String extensionName, + @Context HttpServletRequest request, + @DefaultValue("") @QueryParam("doAs") String doAsUser, + @QueryParam("jobName") String jobName, + @FormDataParam("processes") List<FormDataBodyPart> processForms, + @FormDataParam("feeds") List<FormDataBodyPart> feedForms, + @FormDataParam("config") InputStream config) { + LOG.error("submitAndSchedule is not supported on Server.Please run your operation on Prism "); + throw FalconWebException.newAPIException("submitAndSchedule is not supported on Server. Please run your " + + "operation on Prism."); + } + @GET @Path("describe/{extension-name}") @Produces(MediaType.TEXT_PLAIN) - public String getExtensionDescription( + public APIResult getExtensionDescription( @PathParam("extension-name") String extensionName) { LOG.error("Describe is not supported on Server.Please run your operation on Prism "); throw FalconWebException.newAPIException("Describe is not supported on Server. Please run your operation " @@ -59,7 +113,7 @@ public class ExtensionManager { @GET @Path("detail/{extension-name}") @Produces({MediaType.APPLICATION_JSON}) - public Response getDetail(@PathParam("extension-name") String extensionName) { + public APIResult getDetail(@PathParam("extension-name") String extensionName) { LOG.error("Detail is not supported on Server.Please run your operation on Prism "); throw FalconWebException.newAPIException("Detail is not supported on Server. Please run your operation " + "on Prism."); @@ -69,8 +123,8 @@ public class ExtensionManager { @Path("unregister/{extension-name}") @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) @Produces(MediaType.TEXT_PLAIN) - public String deleteExtensionMetadata( - @PathParam("extension-name") String extensionName){ + public APIResult deleteExtensionMetadata( + @PathParam("extension-name") String extensionName) { LOG.error("Unregister is not supported on Server.Please run your operation on Prism "); throw FalconWebException.newAPIException("Unregister is not supported on Server. Please run your operation " + "on Prism."); @@ -79,7 +133,7 @@ public class ExtensionManager { @GET @Path("definition/{extension-name}") @Produces({MediaType.APPLICATION_JSON}) - public String getExtensionDefinition( + public APIResult getExtensionDefinition( @PathParam("extension-name") String extensionName) { LOG.error("Definition is not supported on Server.Please run your operation on Prism "); throw FalconWebException.newAPIException("Definition is not supported on Server. Please run your operation "
