Repository: falcon Updated Branches: refs/heads/master c79e5e4d3 -> 3e7e08f77
FALCON-2210 Server side changes in submit and submitAndSchedule apis to accept list of feeds and processes Author: sandeep <[email protected]> Reviewers: @pallavi-rao Closes #313 from sandeepSamudrala/FALCON-2210 and squashes the following commits: 3b7a3ae [sandeep] FALCON-2210 applied tags to the entities for user extensions f08cf15 [sandeep] FALCON-2210 Fixed falcon unit client build issues 98d382e [sandeep] FALCON-2210 Incorporated review comments 4004ae7 [sandeep] FALCON-2210 Incorporated review comments d5e4c53 [sandeep] FALCON-2210 Server side changes in submit and submitAndSchedule apis to accept list of feeds and processes 05b87fb [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2210 2bd685f [sandeep] FALCON-2201 Fixed checkstyle issues c7422e6 [sandeep] FALCON-2201 Incorporated review comments. Removed applying tags from client. I will move that to Server side in the server side changes 86446ad [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2201 432cdfd [sandeep] FALCON-2201 Incorporated review comments a0ce5e0 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2201 519a877 [sandeep] FALCON-2201 Fixed checkstyle issues c101c7b [sandeep] FALCON-2201 Incorporated review comments bf0e6ed [sandeep] FALCON-2201 Incorporated review comments and few client side changes adfd318 [sandeep] FALCON-2201 Falcon Unit changes for extension support and falcon unit tests for extensions and fixes. 03f0c3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2201 9cf36e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon bbca081 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 48f6afa [sandeep] Merge branch 'master' of https://github.com/apache/falcon 250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon 1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon c065566 [sandeep] reverting last line changes made 1a4dcd2 [sandeep] rebased and resolved the conflicts from master 271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay. a94d4fe [sandeep] rebasing from master 9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/3e7e08f7 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/3e7e08f7 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/3e7e08f7 Branch: refs/heads/master Commit: 3e7e08f77225d09250d0ac8a9d9fa38c5f69c3f5 Parents: c79e5e4 Author: sandeep <[email protected]> Authored: Mon Dec 12 09:42:35 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Mon Dec 12 09:42:35 2016 +0530 ---------------------------------------------------------------------- .../apache/falcon/cli/FalconExtensionCLI.java | 2 +- .../entity/parser/ProcessEntityParser.java | 39 +++-- .../resource/extensions/ExtensionManager.java | 156 ++++++++++++++----- .../apache/falcon/unit/FalconUnitClient.java | 26 +++- .../falcon/unit/LocalExtensionManager.java | 17 +- 5 files changed, 169 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/3e7e08f7/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java ---------------------------------------------------------------------- diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java index 9b88abe..984b6a3 100644 --- a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java +++ b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java @@ -89,7 +89,7 @@ public class FalconExtensionCLI { } else if (optionsList.contains(UNREGISTER_OPT)) { validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); result = client.unregisterExtension(extensionName); - }else if (optionsList.contains(DETAIL_OPT)) { + } else if (optionsList.contains(DETAIL_OPT)) { validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); result = client.getExtensionDetail(extensionName); result = prettyPrintJson(result); http://git-wip-us.apache.org/repos/asf/falcon/blob/3e7e08f7/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java index 38fa3ae..b977752 100644 --- a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java +++ b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java @@ -74,6 +74,10 @@ public class ProcessEntityParser extends EntityParser<Process> { @Override public void validate(Process process) throws FalconException { + validate(process, true); + } + + public void validate(Process process, boolean checkDependentFeeds) throws FalconException { if (process.getTimezone() == null) { process.setTimezone(TimeZone.getTimeZone("UTC")); } @@ -106,24 +110,27 @@ public class ProcessEntityParser extends EntityParser<Process> { validateHDFSPaths(process, clusterName); validateProperties(process); - if (process.getInputs() != null) { - for (Input input : process.getInputs().getInputs()) { - validateEntityExists(EntityType.FEED, input.getFeed()); - Feed feed = ConfigurationStore.get().get(EntityType.FEED, input.getFeed()); - CrossEntityValidations.validateFeedDefinedForCluster(feed, clusterName); - CrossEntityValidations.validateFeedRetentionPeriod(input.getStart(), feed, clusterName); - CrossEntityValidations.validateInstanceRange(process, input, feed); - validateInputPartition(input, feed); - validateOptionalInputsForTableStorage(feed, input); + if (checkDependentFeeds) { + if (process.getInputs() != null) { + for (Input input : process.getInputs().getInputs()) { + validateEntityExists(EntityType.FEED, input.getFeed()); + Feed feed = ConfigurationStore.get().get(EntityType.FEED, input.getFeed()); + CrossEntityValidations.validateFeedDefinedForCluster(feed, clusterName); + CrossEntityValidations.validateFeedRetentionPeriod(input.getStart(), feed, clusterName); + CrossEntityValidations.validateInstanceRange(process, input, feed); + validateInputPartition(input, feed); + validateOptionalInputsForTableStorage(feed, input); + } } - } - if (process.getOutputs() != null) { - for (Output output : process.getOutputs().getOutputs()) { - validateEntityExists(EntityType.FEED, output.getFeed()); - Feed feed = ConfigurationStore.get().get(EntityType.FEED, output.getFeed()); - CrossEntityValidations.validateFeedDefinedForCluster(feed, clusterName); - CrossEntityValidations.validateInstance(process, output, feed); + + if (process.getOutputs() != null) { + for (Output output : process.getOutputs().getOutputs()) { + validateEntityExists(EntityType.FEED, output.getFeed()); + Feed feed = ConfigurationStore.get().get(EntityType.FEED, output.getFeed()); + CrossEntityValidations.validateFeedDefinedForCluster(feed, clusterName); + CrossEntityValidations.validateInstance(process, output, feed); + } } } } http://git-wip-us.apache.org/repos/asf/falcon/blob/3e7e08f7/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java index 79e3691..241af31 100644 --- a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java @@ -18,16 +18,20 @@ package org.apache.falcon.resource.extensions; +import com.sun.jersey.multipart.FormDataBodyPart; import com.sun.jersey.multipart.FormDataParam; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.FalconWebException; import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.parser.ProcessEntityParser; import org.apache.falcon.entity.parser.ValidationException; import org.apache.falcon.entity.store.StoreAccessException; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.extensions.Extension; import org.apache.falcon.extensions.ExtensionProperties; import org.apache.falcon.extensions.ExtensionService; @@ -43,6 +47,7 @@ import org.apache.falcon.resource.ExtensionJobList; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.service.Services; import org.apache.falcon.util.DeploymentUtil; +import org.apache.hadoop.security.authorize.AuthorizationException; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; @@ -79,10 +84,10 @@ import java.util.Properties; public class ExtensionManager extends AbstractSchedulableEntityManager { public static final Logger LOG = LoggerFactory.getLogger(ExtensionManager.class); - public static final String TAG_PREFIX_EXTENSION_NAME = "_falcon_extension_name="; - public static final String TAG_PREFIX_EXTENSION_JOB = "_falcon_extension_job="; - public static final String ASCENDING_SORT_ORDER = "asc"; - public static final String DESCENDING_SORT_ORDER = "desc"; + private static final String TAG_PREFIX_EXTENSION_NAME = "_falcon_extension_name="; + private static final String TAG_PREFIX_EXTENSION_JOB = "_falcon_extension_job="; + private static final String ASCENDING_SORT_ORDER = "asc"; + private static final String DESCENDING_SORT_ORDER = "desc"; private Extension extension = new Extension(); private static final String EXTENSION_RESULTS = "extensions"; @@ -91,7 +96,7 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { private static final String EXTENSION_NAME = "name"; private static final String EXTENSION_TYPE = "type"; private static final String EXTENSION_DESC = "description"; - public static final String EXTENSION_LOCATION = "location"; + private static final String EXTENSION_LOCATION = "location"; private static final String EXTENSION_PROPERTY_JSON_SUFFIX = "-properties.json"; //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck @@ -294,36 +299,63 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { @POST @Path("submit/{extension-name}") - @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.MULTIPART_FORM_DATA}) + @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.MULTIPART_FORM_DATA, + MediaType.APPLICATION_OCTET_STREAM}) @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) public APIResult submit( @PathParam("extension-name") String extensionName, @Context HttpServletRequest request, @DefaultValue("") @QueryParam("doAs") String doAsUser, @QueryParam("jobName") String jobName, - @FormDataParam("entities") List<Entity> entities, + @FormDataParam("processes") List<FormDataBodyPart> processForms, + @FormDataParam("feeds") List<FormDataBodyPart> feedForms, @FormDataParam("config") InputStream config) { checkIfExtensionServiceIsEnabled(); + Map<EntityType, List<Entity>> entityMap; + try { - entities = getEntityList(extensionName, entities, config); - submitEntities(extensionName, doAsUser, jobName, entities, config); + entityMap = getEntityList(extensionName, jobName, feedForms, processForms, config); + submitEntities(extensionName, doAsUser, jobName, entityMap, config); } catch (FalconException | IOException e) { - LOG.error("Error when submitting extension job: ", e); + LOG.error("Error while submitting extension job: ", e); throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); } return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully" + jobName); } - private void validateEntities(List<Entity> entities) throws FalconException { - for (Entity entity : entities) { - if (!EntityType.FEED.equals(entity.getEntityType()) && !EntityType.PROCESS.equals(entity.getEntityType())) { - LOG.error("Cluster entity is not allowed for submission via submitEntities: {}", entity.getName()); - throw new FalconException("Cluster entity is not allowed for submission in extensions submission"); + private Map<EntityType, List<Entity>> getEntityList(String extensionName, String jobName, + List<FormDataBodyPart> feedForms, + List<FormDataBodyPart> processForms, InputStream config) + throws FalconException, IOException{ + List<Entity> processes = getProcesses(processForms); + List<Entity> feeds = getFeeds(feedForms); + ExtensionType extensionType = getExtensionType(extensionName); + List<Entity> entities; + Map<EntityType, List<Entity>> entityMap = new HashMap<>(); + if (ExtensionType.TRUSTED.equals(extensionType)) { + entities = generateEntities(extensionName, config); + List<Entity> trustedFeeds = new ArrayList<>(); + List<Entity> trustedProcesses = new ArrayList<>(); + for (Entity entity : entities) { + if (EntityType.FEED.equals(entity.getEntityType())) { + trustedFeeds.add(entity); + } else { + trustedProcesses.add(entity); + } } - super.validate(entity); + entityMap.put(EntityType.PROCESS, trustedProcesses); + entityMap.put(EntityType.FEED, trustedFeeds); + return entityMap; + } else { + EntityUtil.applyTags(extensionName, jobName, processes); + EntityUtil.applyTags(extensionName, jobName, feeds); + entityMap.put(EntityType.PROCESS, processes); + entityMap.put(EntityType.FEED, feeds); + return entityMap; } } + private ExtensionType getExtensionType(String extensionName) { ExtensionMetaStore metaStore = ExtensionStore.get().getMetaStore(); ExtensionBean extensionDetails = metaStore.getDetail(extensionName); @@ -339,50 +371,90 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { @Context HttpServletRequest request, @DefaultValue("") @QueryParam("doAs") String doAsUser, @QueryParam("jobName") String jobName, - @FormDataParam("entities") List<Entity> entities, + @FormDataParam("processes") List<FormDataBodyPart> processForms, + @FormDataParam("feeds") List<FormDataBodyPart> feedForms, @FormDataParam("config") InputStream config) { checkIfExtensionServiceIsEnabled(); + Map<EntityType, List<Entity>> entityMap; try { - entities = getEntityList(extensionName, entities, config); - submitEntities(extensionName, doAsUser, jobName, entities, config); - for (Entity entity : entities) { - scheduleInternal(entity.getEntityType().name(), entity.getName(), null, null); - } + entityMap = getEntityList(extensionName, jobName, feedForms, processForms, config); + submitEntities(extensionName, doAsUser, jobName, entityMap, config); + scheduleEntities(entityMap); } catch (FalconException | IOException e) { - LOG.error("Error when submitting extension job: ", e); + LOG.error("Error while submitting extension job: ", e); throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); } return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted and scheduled successfully"); } - protected void submitEntities(String extensionName, String doAsUser, String jobName, List<Entity> entities, - InputStream configStream) throws FalconException, IOException { - validateEntities(entities); - List<String> feeds = new ArrayList<>(); - List<String> processes = new ArrayList<>(); - for (Entity entity : entities) { - submitInternal(entity, doAsUser); - if (EntityType.FEED.equals(entity.getEntityType())) { - feeds.add(entity.getName()); - } else if (EntityType.PROCESS.equals(entity.getEntityType())) { - processes.add(entity.getName()); + private List<Entity> getFeeds(List<FormDataBodyPart> feedForms) { + List<Entity> feeds = new ArrayList<>(); + if (feedForms != null && !feedForms.isEmpty()) { + for (FormDataBodyPart formDataBodyPart : feedForms) { + feeds.add(formDataBodyPart.getValueAs(Feed.class)); } } - ExtensionMetaStore metaStore = ExtensionStore.get().getMetaStore(); + return feeds; + } + + private List<Entity> getProcesses(List<FormDataBodyPart> processForms) { + List<Entity> processes = new ArrayList<>(); + if (processForms != null && !processForms.isEmpty()) { + for (FormDataBodyPart formDataBodyPart : processForms) { + processes.add(formDataBodyPart.getValueAs(Process.class)); + } + } + return processes; + } + + protected void submitEntities(String extensionName, String doAsUser, String jobName, + Map<EntityType, List<Entity>> entityMap, InputStream configStream) + throws FalconException, IOException { + List<Entity> feeds = entityMap.get(EntityType.FEED); + List<Entity> processes = entityMap.get(EntityType.PROCESS); + validateFeeds(feeds); + validateProcesses(processes); + List<String> feedNames = new ArrayList<>(); + List<String> processNames = new ArrayList<>(); + for (Entity feed : feeds) { + submitInternal(feed, doAsUser); + feedNames.add(feed.getName()); + } + for (Entity process: processes) { + submitInternal(process, doAsUser); + processNames.add(process.getName()); + } + + ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); byte[] configBytes = null; if (configStream != null) { configBytes = IOUtils.toByteArray(configStream); } - metaStore.storeExtensionJob(jobName, extensionName, feeds, processes, configBytes); + metaStore.storeExtensionJob(jobName, extensionName, feedNames, processNames, configBytes); } - private List<Entity> getEntityList(String extensionName, List<Entity> entities, InputStream config) - throws FalconException, IOException { - ExtensionType extensionType = getExtensionType(extensionName); - if (ExtensionType.TRUSTED.equals(extensionType)) { - entities = generateEntities(extensionName, config); + protected void scheduleEntities(Map<EntityType, List<Entity>> entityMap) throws FalconException, + AuthorizationException { + for (Object feed: entityMap.get(EntityType.FEED)) { + scheduleInternal(EntityType.FEED.name(), ((Feed)feed).getName(), null, null); + } + for (Object process: entityMap.get(EntityType.PROCESS)) { + scheduleInternal(EntityType.PROCESS.name(), ((Process)process).getName(), null, null); + } + } + + + private void validateFeeds(List<Entity> feeds) throws FalconException { + for (Entity feed : feeds) { + super.validate(feed); + } + } + + private void validateProcesses(List<Entity> processes) throws FalconException { + ProcessEntityParser processEntityParser = new ProcessEntityParser(); + for (Entity process : processes) { + processEntityParser.validate((Process)process, false); } - return entities; } @POST http://git-wip-us.apache.org/repos/asf/falcon/blob/3e7e08f7/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java index 00c2ad1..0eb0ab3 100644 --- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java @@ -58,6 +58,7 @@ import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Date; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -284,13 +285,30 @@ public class FalconUnitClient extends AbstractFalconClient { InputStream configStream = getServletInputStream(configPath); try { - List<Entity> entities = getEntities(extensionName, jobName, configStream); - return localExtensionManager.submitExtensionJob(extensionName, jobName, configStream, entities); + Map<EntityType, List<Entity>> entityMap = getEntityTypeListMap(extensionName, jobName, configStream); + return localExtensionManager.submitExtensionJob(extensionName, jobName, configStream, entityMap); } catch (FalconException | IOException e) { throw new FalconCLIException("Failed in submitting extension job " + jobName); } } + private Map<EntityType, List<Entity>> getEntityTypeListMap(String extensionName, String jobName, InputStream configStream) { + List<Entity> entities = getEntities(extensionName, jobName, configStream); + List<Entity> feeds = new ArrayList<>(); + List<Entity> processes = new ArrayList<>(); + for (Entity entity : entities) { + if (EntityType.FEED.equals(entity.getEntityType())) { + feeds.add(entity); + } else if (EntityType.PROCESS.equals(entity.getEntityType())) { + processes.add(entity); + } + } + Map<EntityType, List<Entity>> entityMap = new HashMap<>(); + entityMap.put(EntityType.PROCESS, processes); + entityMap.put(EntityType.FEED, feeds); + return entityMap; + } + private List<Entity> getEntities(String extensionName, String jobName, InputStream configStream) { String packagePath = ExtensionStore.get().getMetaStore().getDetail(extensionName).getLocation(); List<Entity> entities; @@ -308,9 +326,9 @@ public class FalconUnitClient extends AbstractFalconClient { String doAsUser) { InputStream configStream = getServletInputStream(configPath); try { - List<Entity> entities = getEntities(extensionName, jobName, configStream); + Map<EntityType, List<Entity>> entityMap = getEntityTypeListMap(extensionName, jobName, configStream); return localExtensionManager.submitAndSchedulableExtensionJob(extensionName, jobName, configStream, - entities); + entityMap); } catch (FalconException | IOException e) { throw new FalconCLIException("Failed in submitting extension job " + jobName); } http://git-wip-us.apache.org/repos/asf/falcon/blob/3e7e08f7/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java index 5d2710c..553e7d6 100644 --- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java +++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java @@ -20,12 +20,14 @@ package org.apache.falcon.unit; import org.apache.falcon.FalconException; import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.extensions.ExtensionManager; import java.io.IOException; import java.io.InputStream; import java.util.List; +import java.util.Map; /** * A proxy implementation of the extension operations in local mode. @@ -33,18 +35,17 @@ import java.util.List; public class LocalExtensionManager extends ExtensionManager { public LocalExtensionManager() {} - public APIResult submitExtensionJob(String extensionName, String jobName, InputStream config, List<Entity> entities) - throws FalconException, IOException { - submitEntities(extensionName, null, jobName, entities, config); + public APIResult submitExtensionJob(String extensionName, String jobName, InputStream config, + Map<EntityType, List<Entity>> entityMap) throws FalconException, IOException { + submitEntities(extensionName, null, jobName, entityMap, config); return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully" + jobName); } public APIResult submitAndSchedulableExtensionJob(String extensionName, String jobName, InputStream config, - List<Entity> entities) throws FalconException, IOException { - submitEntities(extensionName, null, jobName, entities, config); - for (Entity entity : entities) { - scheduleInternal(entity.getEntityType().name(), entity.getName(), null, null); - } + Map<EntityType, List<Entity>> entityMap) + throws FalconException, IOException { + submitEntities(extensionName, null, jobName, entityMap, config); + scheduleEntities(entityMap); return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully" + jobName); }
