Repository: falcon
Updated Branches:
  refs/heads/master c79e5e4d3 -> 3e7e08f77


FALCON-2210 Server side changes in submit and submitAndSchedule apis to accept 
list of feeds and processes

Author: sandeep <[email protected]>

Reviewers: @pallavi-rao

Closes #313 from sandeepSamudrala/FALCON-2210 and squashes the following 
commits:

3b7a3ae [sandeep] FALCON-2210 applied tags to the entities for user extensions
f08cf15 [sandeep] FALCON-2210 Fixed falcon unit client build issues
98d382e [sandeep] FALCON-2210 Incorporated review comments
4004ae7 [sandeep] FALCON-2210 Incorporated review comments
d5e4c53 [sandeep] FALCON-2210 Server side changes in submit and 
submitAndSchedule apis to accept list of feeds and processes
05b87fb [sandeep] Merge branch 'master' of https://github.com/apache/falcon 
into FALCON-2210
2bd685f [sandeep] FALCON-2201 Fixed checkstyle issues
c7422e6 [sandeep] FALCON-2201 Incorporated review comments. Removed applying 
tags from client. I will move that to Server side in the server side changes
86446ad [sandeep] Merge branch 'master' of https://github.com/apache/falcon 
into FALCON-2201
432cdfd [sandeep] FALCON-2201 Incorporated review comments
a0ce5e0 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 
into FALCON-2201
519a877 [sandeep] FALCON-2201 Fixed checkstyle issues
c101c7b [sandeep] FALCON-2201 Incorporated review comments
bf0e6ed [sandeep] FALCON-2201 Incorporated review comments and few client side 
changes
adfd318 [sandeep] FALCON-2201 Falcon Unit changes for extension support and 
falcon unit tests for extensions and fixes.
03f0c3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon 
into FALCON-2201
9cf36e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
bbca081 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
48f6afa [sandeep] Merge branch 'master' of https://github.com/apache/falcon
250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon
1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon
c065566 [sandeep] reverting last line changes made
1a4dcd2 [sandeep] rebased and resolved the conflicts from master
271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next 
instance time with Delay.
a94d4fe [sandeep] rebasing from master
9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/3e7e08f7
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/3e7e08f7
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/3e7e08f7

Branch: refs/heads/master
Commit: 3e7e08f77225d09250d0ac8a9d9fa38c5f69c3f5
Parents: c79e5e4
Author: sandeep <[email protected]>
Authored: Mon Dec 12 09:42:35 2016 +0530
Committer: Pallavi Rao <[email protected]>
Committed: Mon Dec 12 09:42:35 2016 +0530

----------------------------------------------------------------------
 .../apache/falcon/cli/FalconExtensionCLI.java   |   2 +-
 .../entity/parser/ProcessEntityParser.java      |  39 +++--
 .../resource/extensions/ExtensionManager.java   | 156 ++++++++++++++-----
 .../apache/falcon/unit/FalconUnitClient.java    |  26 +++-
 .../falcon/unit/LocalExtensionManager.java      |  17 +-
 5 files changed, 169 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/3e7e08f7/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
----------------------------------------------------------------------
diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java 
b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
index 9b88abe..984b6a3 100644
--- a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
+++ b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
@@ -89,7 +89,7 @@ public class FalconExtensionCLI {
         } else if (optionsList.contains(UNREGISTER_OPT)) {
             validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
             result = client.unregisterExtension(extensionName);
-        }else if (optionsList.contains(DETAIL_OPT)) {
+        } else if (optionsList.contains(DETAIL_OPT)) {
             validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
             result = client.getExtensionDetail(extensionName);
             result = prettyPrintJson(result);

http://git-wip-us.apache.org/repos/asf/falcon/blob/3e7e08f7/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java 
b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
index 38fa3ae..b977752 100644
--- 
a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
+++ 
b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
@@ -74,6 +74,10 @@ public class ProcessEntityParser extends 
EntityParser<Process> {
 
     @Override
     public void validate(Process process) throws FalconException {
+        validate(process, true);
+    }
+
+    public void validate(Process process, boolean checkDependentFeeds) throws 
FalconException {
         if (process.getTimezone() == null) {
             process.setTimezone(TimeZone.getTimeZone("UTC"));
         }
@@ -106,24 +110,27 @@ public class ProcessEntityParser extends 
EntityParser<Process> {
             validateHDFSPaths(process, clusterName);
             validateProperties(process);
 
-            if (process.getInputs() != null) {
-                for (Input input : process.getInputs().getInputs()) {
-                    validateEntityExists(EntityType.FEED, input.getFeed());
-                    Feed feed = ConfigurationStore.get().get(EntityType.FEED, 
input.getFeed());
-                    CrossEntityValidations.validateFeedDefinedForCluster(feed, 
clusterName);
-                    
CrossEntityValidations.validateFeedRetentionPeriod(input.getStart(), feed, 
clusterName);
-                    CrossEntityValidations.validateInstanceRange(process, 
input, feed);
-                    validateInputPartition(input, feed);
-                    validateOptionalInputsForTableStorage(feed, input);
+            if (checkDependentFeeds) {
+                if (process.getInputs() != null) {
+                    for (Input input : process.getInputs().getInputs()) {
+                        validateEntityExists(EntityType.FEED, input.getFeed());
+                        Feed feed = 
ConfigurationStore.get().get(EntityType.FEED, input.getFeed());
+                        
CrossEntityValidations.validateFeedDefinedForCluster(feed, clusterName);
+                        
CrossEntityValidations.validateFeedRetentionPeriod(input.getStart(), feed, 
clusterName);
+                        CrossEntityValidations.validateInstanceRange(process, 
input, feed);
+                        validateInputPartition(input, feed);
+                        validateOptionalInputsForTableStorage(feed, input);
+                    }
                 }
-            }
 
-            if (process.getOutputs() != null) {
-                for (Output output : process.getOutputs().getOutputs()) {
-                    validateEntityExists(EntityType.FEED, output.getFeed());
-                    Feed feed = ConfigurationStore.get().get(EntityType.FEED, 
output.getFeed());
-                    CrossEntityValidations.validateFeedDefinedForCluster(feed, 
clusterName);
-                    CrossEntityValidations.validateInstance(process, output, 
feed);
+
+                if (process.getOutputs() != null) {
+                    for (Output output : process.getOutputs().getOutputs()) {
+                        validateEntityExists(EntityType.FEED, 
output.getFeed());
+                        Feed feed = 
ConfigurationStore.get().get(EntityType.FEED, output.getFeed());
+                        
CrossEntityValidations.validateFeedDefinedForCluster(feed, clusterName);
+                        CrossEntityValidations.validateInstance(process, 
output, feed);
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/3e7e08f7/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
----------------------------------------------------------------------
diff --git 
a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
 
b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
index 79e3691..241af31 100644
--- 
a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
+++ 
b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
@@ -18,16 +18,20 @@
 
 package org.apache.falcon.resource.extensions;
 
+import com.sun.jersey.multipart.FormDataBodyPart;
 import com.sun.jersey.multipart.FormDataParam;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.FalconWebException;
 import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.parser.ProcessEntityParser;
 import org.apache.falcon.entity.parser.ValidationException;
 import org.apache.falcon.entity.store.StoreAccessException;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.extensions.Extension;
 import org.apache.falcon.extensions.ExtensionProperties;
 import org.apache.falcon.extensions.ExtensionService;
@@ -43,6 +47,7 @@ import org.apache.falcon.resource.ExtensionJobList;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.service.Services;
 import org.apache.falcon.util.DeploymentUtil;
+import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
@@ -79,10 +84,10 @@ import java.util.Properties;
 public class ExtensionManager extends AbstractSchedulableEntityManager {
     public static final Logger LOG = 
LoggerFactory.getLogger(ExtensionManager.class);
 
-    public static final String TAG_PREFIX_EXTENSION_NAME = 
"_falcon_extension_name=";
-    public static final String TAG_PREFIX_EXTENSION_JOB = 
"_falcon_extension_job=";
-    public static final String ASCENDING_SORT_ORDER = "asc";
-    public static final String DESCENDING_SORT_ORDER = "desc";
+    private static final String TAG_PREFIX_EXTENSION_NAME = 
"_falcon_extension_name=";
+    private static final String TAG_PREFIX_EXTENSION_JOB = 
"_falcon_extension_job=";
+    private static final String ASCENDING_SORT_ORDER = "asc";
+    private static final String DESCENDING_SORT_ORDER = "desc";
 
     private Extension extension = new Extension();
     private static final String EXTENSION_RESULTS = "extensions";
@@ -91,7 +96,7 @@ public class ExtensionManager extends 
AbstractSchedulableEntityManager {
     private static final String EXTENSION_NAME = "name";
     private static final String EXTENSION_TYPE = "type";
     private static final String EXTENSION_DESC = "description";
-    public static final String EXTENSION_LOCATION = "location";
+    private static final String EXTENSION_LOCATION = "location";
 
     private static final String EXTENSION_PROPERTY_JSON_SUFFIX = 
"-properties.json";
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
@@ -294,36 +299,63 @@ public class ExtensionManager extends 
AbstractSchedulableEntityManager {
 
     @POST
     @Path("submit/{extension-name}")
-    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, 
MediaType.MULTIPART_FORM_DATA})
+    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, 
MediaType.MULTIPART_FORM_DATA,
+            MediaType.APPLICATION_OCTET_STREAM})
     @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, 
MediaType.APPLICATION_JSON})
     public APIResult submit(
             @PathParam("extension-name") String extensionName,
             @Context HttpServletRequest request,
             @DefaultValue("") @QueryParam("doAs") String doAsUser,
             @QueryParam("jobName") String jobName,
-            @FormDataParam("entities") List<Entity> entities,
+            @FormDataParam("processes") List<FormDataBodyPart> processForms,
+            @FormDataParam("feeds") List<FormDataBodyPart> feedForms,
             @FormDataParam("config") InputStream config) {
         checkIfExtensionServiceIsEnabled();
+        Map<EntityType, List<Entity>> entityMap;
+
         try {
-            entities = getEntityList(extensionName, entities, config);
-            submitEntities(extensionName, doAsUser, jobName, entities, config);
+            entityMap = getEntityList(extensionName, jobName, feedForms, 
processForms, config);
+            submitEntities(extensionName, doAsUser, jobName, entityMap, 
config);
         } catch (FalconException | IOException e) {
-            LOG.error("Error when submitting extension job: ", e);
+            LOG.error("Error while submitting extension job: ", e);
             throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
         }
         return new APIResult(APIResult.Status.SUCCEEDED, "Extension job 
submitted successfully" + jobName);
     }
 
-    private void validateEntities(List<Entity> entities) throws 
FalconException {
-        for (Entity entity : entities) {
-            if (!EntityType.FEED.equals(entity.getEntityType()) && 
!EntityType.PROCESS.equals(entity.getEntityType())) {
-                LOG.error("Cluster entity is not allowed for submission via 
submitEntities: {}", entity.getName());
-                throw new FalconException("Cluster entity is not allowed for 
submission in extensions submission");
+    private Map<EntityType, List<Entity>> getEntityList(String extensionName, 
String jobName,
+                                                        List<FormDataBodyPart> 
feedForms,
+                                                        List<FormDataBodyPart> 
processForms, InputStream config)
+        throws FalconException, IOException{
+        List<Entity> processes = getProcesses(processForms);
+        List<Entity> feeds = getFeeds(feedForms);
+        ExtensionType extensionType = getExtensionType(extensionName);
+        List<Entity> entities;
+        Map<EntityType, List<Entity>> entityMap = new HashMap<>();
+        if (ExtensionType.TRUSTED.equals(extensionType)) {
+            entities = generateEntities(extensionName, config);
+            List<Entity> trustedFeeds = new ArrayList<>();
+            List<Entity> trustedProcesses = new ArrayList<>();
+            for (Entity entity : entities) {
+                if (EntityType.FEED.equals(entity.getEntityType())) {
+                    trustedFeeds.add(entity);
+                } else {
+                    trustedProcesses.add(entity);
+                }
             }
-            super.validate(entity);
+            entityMap.put(EntityType.PROCESS, trustedProcesses);
+            entityMap.put(EntityType.FEED, trustedFeeds);
+            return entityMap;
+        } else {
+            EntityUtil.applyTags(extensionName, jobName, processes);
+            EntityUtil.applyTags(extensionName, jobName, feeds);
+            entityMap.put(EntityType.PROCESS, processes);
+            entityMap.put(EntityType.FEED, feeds);
+            return entityMap;
         }
     }
 
+
     private ExtensionType getExtensionType(String extensionName) {
         ExtensionMetaStore metaStore = ExtensionStore.get().getMetaStore();
         ExtensionBean extensionDetails = metaStore.getDetail(extensionName);
@@ -339,50 +371,90 @@ public class ExtensionManager extends 
AbstractSchedulableEntityManager {
             @Context HttpServletRequest request,
             @DefaultValue("") @QueryParam("doAs") String doAsUser,
             @QueryParam("jobName") String jobName,
-            @FormDataParam("entities") List<Entity> entities,
+            @FormDataParam("processes") List<FormDataBodyPart> processForms,
+            @FormDataParam("feeds") List<FormDataBodyPart> feedForms,
             @FormDataParam("config") InputStream config) {
         checkIfExtensionServiceIsEnabled();
+        Map<EntityType, List<Entity>> entityMap;
         try {
-            entities = getEntityList(extensionName, entities, config);
-            submitEntities(extensionName, doAsUser, jobName, entities, config);
-            for (Entity entity : entities) {
-                scheduleInternal(entity.getEntityType().name(), 
entity.getName(), null, null);
-            }
+            entityMap = getEntityList(extensionName, jobName, feedForms, 
processForms, config);
+            submitEntities(extensionName, doAsUser, jobName, entityMap, 
config);
+            scheduleEntities(entityMap);
         } catch (FalconException | IOException e) {
-            LOG.error("Error when submitting extension job: ", e);
+            LOG.error("Error while submitting extension job: ", e);
             throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
         }
         return new APIResult(APIResult.Status.SUCCEEDED, "Extension job 
submitted and scheduled successfully");
     }
 
-    protected void submitEntities(String extensionName, String doAsUser, 
String jobName, List<Entity> entities,
-                                InputStream configStream) throws 
FalconException, IOException {
-        validateEntities(entities);
-        List<String> feeds = new ArrayList<>();
-        List<String> processes = new ArrayList<>();
-        for (Entity entity : entities) {
-            submitInternal(entity, doAsUser);
-            if (EntityType.FEED.equals(entity.getEntityType())) {
-                feeds.add(entity.getName());
-            } else if (EntityType.PROCESS.equals(entity.getEntityType())) {
-                processes.add(entity.getName());
+    private List<Entity> getFeeds(List<FormDataBodyPart> feedForms) {
+        List<Entity> feeds = new ArrayList<>();
+        if (feedForms != null && !feedForms.isEmpty()) {
+            for (FormDataBodyPart formDataBodyPart : feedForms) {
+                feeds.add(formDataBodyPart.getValueAs(Feed.class));
             }
         }
-        ExtensionMetaStore metaStore = ExtensionStore.get().getMetaStore();
+        return feeds;
+    }
+
+    private List<Entity> getProcesses(List<FormDataBodyPart> processForms) {
+        List<Entity> processes = new ArrayList<>();
+        if (processForms != null && !processForms.isEmpty()) {
+            for (FormDataBodyPart formDataBodyPart : processForms) {
+                processes.add(formDataBodyPart.getValueAs(Process.class));
+            }
+        }
+        return processes;
+    }
+
+    protected void submitEntities(String extensionName, String doAsUser, 
String jobName,
+                                  Map<EntityType, List<Entity>> entityMap, 
InputStream configStream)
+        throws FalconException, IOException {
+        List<Entity> feeds = entityMap.get(EntityType.FEED);
+        List<Entity> processes = entityMap.get(EntityType.PROCESS);
+        validateFeeds(feeds);
+        validateProcesses(processes);
+        List<String> feedNames = new ArrayList<>();
+        List<String> processNames = new ArrayList<>();
+        for (Entity feed : feeds) {
+            submitInternal(feed, doAsUser);
+            feedNames.add(feed.getName());
+        }
+        for (Entity process: processes) {
+            submitInternal(process, doAsUser);
+            processNames.add(process.getName());
+        }
+
+        ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
         byte[] configBytes = null;
         if (configStream != null) {
             configBytes = IOUtils.toByteArray(configStream);
         }
-        metaStore.storeExtensionJob(jobName, extensionName, feeds, processes, 
configBytes);
+        metaStore.storeExtensionJob(jobName, extensionName, feedNames, 
processNames, configBytes);
     }
 
-    private List<Entity> getEntityList(String extensionName, List<Entity> 
entities, InputStream config)
-        throws FalconException, IOException {
-        ExtensionType extensionType = getExtensionType(extensionName);
-        if (ExtensionType.TRUSTED.equals(extensionType)) {
-            entities = generateEntities(extensionName, config);
+    protected void scheduleEntities(Map<EntityType, List<Entity>> entityMap) 
throws FalconException,
+            AuthorizationException {
+        for (Object feed: entityMap.get(EntityType.FEED)) {
+            scheduleInternal(EntityType.FEED.name(), ((Feed)feed).getName(), 
null, null);
+        }
+        for (Object process: entityMap.get(EntityType.PROCESS)) {
+            scheduleInternal(EntityType.PROCESS.name(), 
((Process)process).getName(), null, null);
+        }
+    }
+
+
+    private void validateFeeds(List<Entity> feeds) throws FalconException {
+        for (Entity feed : feeds) {
+            super.validate(feed);
+        }
+    }
+
+    private void validateProcesses(List<Entity> processes) throws 
FalconException {
+        ProcessEntityParser processEntityParser = new ProcessEntityParser();
+        for (Entity process : processes) {
+            processEntityParser.validate((Process)process, false);
         }
-        return entities;
     }
 
     @POST

http://git-wip-us.apache.org/repos/asf/falcon/blob/3e7e08f7/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java 
b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index 00c2ad1..0eb0ab3 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -58,6 +58,7 @@ import java.io.InputStream;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -284,13 +285,30 @@ public class FalconUnitClient extends 
AbstractFalconClient {
 
         InputStream configStream = getServletInputStream(configPath);
         try {
-            List<Entity> entities = getEntities(extensionName, jobName, 
configStream);
-            return localExtensionManager.submitExtensionJob(extensionName, 
jobName, configStream, entities);
+            Map<EntityType, List<Entity>> entityMap = 
getEntityTypeListMap(extensionName, jobName, configStream);
+            return localExtensionManager.submitExtensionJob(extensionName, 
jobName, configStream, entityMap);
         } catch (FalconException | IOException e) {
             throw new FalconCLIException("Failed in submitting extension job " 
+ jobName);
         }
     }
 
+    private Map<EntityType, List<Entity>> getEntityTypeListMap(String 
extensionName, String jobName, InputStream configStream) {
+        List<Entity> entities = getEntities(extensionName, jobName, 
configStream);
+        List<Entity> feeds = new ArrayList<>();
+        List<Entity> processes = new ArrayList<>();
+        for (Entity entity : entities) {
+            if (EntityType.FEED.equals(entity.getEntityType())) {
+                feeds.add(entity);
+            } else if (EntityType.PROCESS.equals(entity.getEntityType())) {
+                processes.add(entity);
+            }
+        }
+        Map<EntityType, List<Entity>> entityMap = new HashMap<>();
+        entityMap.put(EntityType.PROCESS, processes);
+        entityMap.put(EntityType.FEED, feeds);
+        return entityMap;
+    }
+
     private List<Entity> getEntities(String extensionName, String jobName, 
InputStream configStream) {
         String packagePath = 
ExtensionStore.get().getMetaStore().getDetail(extensionName).getLocation();
         List<Entity> entities;
@@ -308,9 +326,9 @@ public class FalconUnitClient extends AbstractFalconClient {
                                                    String doAsUser) {
         InputStream configStream = getServletInputStream(configPath);
         try {
-            List<Entity> entities = getEntities(extensionName, jobName, 
configStream);
+            Map<EntityType, List<Entity>> entityMap = 
getEntityTypeListMap(extensionName, jobName, configStream);
             return 
localExtensionManager.submitAndSchedulableExtensionJob(extensionName, jobName, 
configStream,
-                    entities);
+                    entityMap);
         } catch (FalconException | IOException e) {
             throw new FalconCLIException("Failed in submitting extension job " 
+ jobName);
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/3e7e08f7/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
----------------------------------------------------------------------
diff --git 
a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java 
b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
index 5d2710c..553e7d6 100644
--- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
@@ -20,12 +20,14 @@ package org.apache.falcon.unit;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.resource.APIResult;
 import org.apache.falcon.resource.extensions.ExtensionManager;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
+import java.util.Map;
 
 /**
  * A proxy implementation of the extension operations in local mode.
@@ -33,18 +35,17 @@ import java.util.List;
 public class LocalExtensionManager extends ExtensionManager {
     public LocalExtensionManager() {}
 
-    public APIResult submitExtensionJob(String extensionName, String jobName, 
InputStream config, List<Entity> entities)
-        throws FalconException, IOException {
-        submitEntities(extensionName, null, jobName, entities, config);
+    public APIResult submitExtensionJob(String extensionName, String jobName, 
InputStream config,
+                                        Map<EntityType, List<Entity>> 
entityMap) throws FalconException, IOException {
+        submitEntities(extensionName, null, jobName, entityMap, config);
         return new APIResult(APIResult.Status.SUCCEEDED, "Extension job 
submitted successfully" + jobName);
     }
 
     public APIResult submitAndSchedulableExtensionJob(String extensionName, 
String jobName, InputStream config,
-                                                      List<Entity> entities) 
throws FalconException, IOException {
-        submitEntities(extensionName, null, jobName, entities, config);
-        for (Entity entity : entities) {
-            scheduleInternal(entity.getEntityType().name(), entity.getName(), 
null, null);
-        }
+                                                      Map<EntityType, 
List<Entity>> entityMap)
+        throws FalconException, IOException {
+        submitEntities(extensionName, null, jobName, entityMap, config);
+        scheduleEntities(entityMap);
         return new APIResult(APIResult.Status.SUCCEEDED, "Extension job 
submitted successfully" + jobName);
     }
 

Reply via email to