FALCON-2223 Distributed mode support for User Extension

Author: Praveen Adlakha <[email protected]>

Reviewers: @sandeepSamudrala, @pallavi-rao

Closes #323 from PraveenAdlakha/extensionManagerSupport and squashes the 
following commits:

9b6bed9 [Praveen Adlakha] merge conflicts resolved
2a18163 [Praveen Adlakha] comments addresed
5ef0ff2 [Praveen Adlakha] FALCON-2223 Distributed mode support for User 
Extension


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/cdb5404a
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/cdb5404a
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/cdb5404a

Branch: refs/heads/master
Commit: cdb5404aece64abe35c79dbf9cfa7ce9085e2cbc
Parents: d1052bf
Author: Praveen Adlakha <[email protected]>
Authored: Mon Dec 19 12:05:26 2016 +0530
Committer: Praveen Adlakha <[email protected]>
Committed: Mon Dec 19 12:05:26 2016 +0530

----------------------------------------------------------------------
 common/src/main/resources/startup.properties    |   2 +-
 .../resource/AbstractExtensionManager.java      | 180 +++++
 .../resource/extensions/ExtensionManager.java   | 749 -------------------
 .../resource/proxy/ExtensionManagerProxy.java   | 694 +++++++++++++++++
 .../proxy/SchedulableEntityManagerProxy.java    |   6 +-
 prism/src/main/webapp/WEB-INF/web.xml           |   3 +-
 src/conf/startup.properties                     |   2 +-
 .../falcon/unit/LocalExtensionManager.java      |   7 +-
 .../falcon/resource/ExtensionManager.java       |  88 +++
 .../src/main/webapp/WEB-INF/distributed/web.xml |   2 +-
 10 files changed, 973 insertions(+), 760 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/cdb5404a/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties 
b/common/src/main/resources/startup.properties
index 01386f1..f899905 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -51,7 +51,7 @@
 #org.apache.falcon.metadata.MetadataMappingService,\
 
 ##Add if you want to use Trusted or User Extensions
-## In case of distributed Mode enable ExtensionService only on Prism
+## In case of distributed Mode enable ExtensionService only on Prism via 
prism.application.services
 ## It should come after FalconJPAService in application services
 #org.apache.falcon.extensions.ExtensionService,\
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/cdb5404a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
----------------------------------------------------------------------
diff --git 
a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java 
b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
new file mode 100644
index 0000000..f0f21a9
--- /dev/null
+++ 
b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.resource;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.FalconWebException;
+import org.apache.falcon.entity.parser.ProcessEntityParser;
+import org.apache.falcon.entity.parser.ValidationException;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
+import org.apache.falcon.extensions.store.ExtensionStore;
+import org.apache.falcon.persistence.ExtensionJobsBean;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A base class for managing Extension Operations.
+ */
+public class AbstractExtensionManager extends AbstractSchedulableEntityManager 
{
+    public static final Logger LOG = 
LoggerFactory.getLogger(AbstractExtensionManager.class);
+
+    private static final String JOB_NAME = "jobName";
+    public static final String TAG_PREFIX_EXTENSION_JOB = 
"_falcon_extension_job=";
+    private static final String EXTENSION_NAME = "extensionName";
+    private static final String FEEDS = "feeds";
+    private static final String PROCESSES = "processes";
+    private static final String CONFIG  = "config";
+    private static final String CREATION_TIME  = "creationTime";
+    private static final String LAST_UPDATE_TIME  = "lastUpdatedTime";
+
+    public static void validateExtensionName(final String extensionName) {
+        if (StringUtils.isBlank(extensionName)) {
+            throw FalconWebException.newAPIException("Extension name is 
mandatory and shouldn't be blank",
+                    Response.Status.BAD_REQUEST);
+        }
+    }
+
+    public String registerExtensionMetadata(String extensionName, String path, 
String description, String owner) {
+        validateExtensionName(extensionName);
+        try {
+            return ExtensionStore.get().registerExtension(extensionName, path, 
description, owner);
+        } catch (Throwable e) {
+            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
+        }
+    }
+
+    public String getExtensionJobDetail(String jobName) {
+        try {
+            return buildExtensionJobDetailResult(jobName).toString();
+        } catch (FalconException e) {
+            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
+        }
+    }
+
+    public String deleteExtensionMetadata(String extensionName){
+        validateExtensionName(extensionName);
+        try {
+            return ExtensionStore.get().deleteExtension(extensionName, 
CurrentUser.getUser());
+        } catch (Throwable e) {
+            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
+        }
+    }
+
+    private JSONObject buildExtensionJobDetailResult(final String jobName) 
throws FalconException {
+        ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+        ExtensionJobsBean jobsBean = metaStore.getExtensionJobDetails(jobName);
+        if (jobsBean == null) {
+            throw new ValidationException("Job name not found:" + jobName);
+        }
+        JSONObject detailsObject = new JSONObject();
+        try {
+            detailsObject.put(JOB_NAME, jobsBean.getJobName());
+            detailsObject.put(EXTENSION_NAME, jobsBean.getExtensionName());
+            detailsObject.put(FEEDS, StringUtils.join(jobsBean.getFeeds(), 
","));
+            detailsObject.put(PROCESSES, 
StringUtils.join(jobsBean.getProcesses(), ","));
+            detailsObject.put(CONFIG, jobsBean.getConfig());
+            detailsObject.put(CREATION_TIME, jobsBean.getCreationTime());
+            detailsObject.put(LAST_UPDATE_TIME, jobsBean.getLastUpdatedTime());
+        } catch (JSONException e) {
+            LOG.error("Exception while building extension jon details for job 
{}", jobName, e);
+        }
+        return detailsObject;
+    }
+
+
+    protected void submitEntities(String extensionName, String doAsUser, 
String jobName,
+                                  Map<EntityType, List<Entity>> entityMap, 
InputStream configStream)
+        throws FalconException, IOException {
+        List<Entity> feeds = entityMap.get(EntityType.FEED);
+        List<Entity> processes = entityMap.get(EntityType.PROCESS);
+        validateFeeds(feeds);
+        validateProcesses(processes);
+        List<String> feedNames = new ArrayList<>();
+        List<String> processNames = new ArrayList<>();
+        for (Entity feed : feeds) {
+            submitInternal(feed, doAsUser);
+            feedNames.add(feed.getName());
+        }
+        for (Entity process: processes) {
+            submitInternal(process, doAsUser);
+            processNames.add(process.getName());
+        }
+
+        ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+        byte[] configBytes = null;
+        if (configStream != null) {
+            configBytes = IOUtils.toByteArray(configStream);
+        }
+        metaStore.storeExtensionJob(jobName, extensionName, feedNames, 
processNames, configBytes);
+    }
+
+
+    private void validateFeeds(List<Entity> feeds) throws FalconException {
+        for (Entity feed : feeds) {
+            super.validate(feed);
+        }
+    }
+
+    private void validateProcesses(List<Entity> processes) throws 
FalconException {
+        ProcessEntityParser processEntityParser = new ProcessEntityParser();
+        for (Entity process : processes) {
+            processEntityParser.validate((Process)process, false);
+        }
+    }
+
+    protected void scheduleEntities(Map<EntityType, List<Entity>> entityMap) 
throws FalconException,
+            AuthorizationException {
+        for (Object feed: entityMap.get(EntityType.FEED)) {
+            scheduleInternal(EntityType.FEED.name(), ((Feed)feed).getName(), 
null, null);
+        }
+        for (Object process: entityMap.get(EntityType.PROCESS)) {
+            scheduleInternal(EntityType.PROCESS.name(), 
((Process)process).getName(), null, null);
+        }
+    }
+
+    public static String getJobNameFromTag(String tags) {
+        int nameStart = tags.indexOf(TAG_PREFIX_EXTENSION_JOB);
+        if (nameStart == -1) {
+            return null;
+        }
+
+        nameStart = nameStart + TAG_PREFIX_EXTENSION_JOB.length();
+        int nameEnd = tags.indexOf(',', nameStart);
+        if (nameEnd == -1) {
+            nameEnd = tags.length();
+        }
+        return tags.substring(nameStart, nameEnd);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cdb5404a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
----------------------------------------------------------------------
diff --git 
a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
 
b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
deleted file mode 100644
index 29859de..0000000
--- 
a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
+++ /dev/null
@@ -1,749 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.resource.extensions;
-
-import com.sun.jersey.multipart.FormDataBodyPart;
-import com.sun.jersey.multipart.FormDataParam;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.FalconWebException;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.parser.ProcessEntityParser;
-import org.apache.falcon.entity.parser.ValidationException;
-import org.apache.falcon.entity.store.StoreAccessException;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.extensions.Extension;
-import org.apache.falcon.extensions.ExtensionProperties;
-import org.apache.falcon.extensions.ExtensionService;
-import org.apache.falcon.extensions.ExtensionType;
-import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
-import org.apache.falcon.extensions.store.ExtensionStore;
-import org.apache.falcon.persistence.ExtensionBean;
-import org.apache.falcon.persistence.ExtensionJobsBean;
-import org.apache.falcon.resource.APIResult;
-import org.apache.falcon.resource.AbstractSchedulableEntityManager;
-import org.apache.falcon.resource.EntityList;
-import org.apache.falcon.resource.ExtensionInstanceList;
-import org.apache.falcon.resource.ExtensionJobList;
-import org.apache.falcon.resource.InstancesResult;
-import org.apache.falcon.security.CurrentUser;
-import org.apache.falcon.service.Services;
-import org.apache.falcon.util.DeploymentUtil;
-import org.apache.hadoop.security.authorize.AuthorizationException;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Jersey Resource for extension job operations.
- */
-@Path("extension")
-public class ExtensionManager extends AbstractSchedulableEntityManager {
-    public static final Logger LOG = 
LoggerFactory.getLogger(ExtensionManager.class);
-
-    private static final String TAG_PREFIX_EXTENSION_NAME = 
"_falcon_extension_name=";
-    private static final String TAG_PREFIX_EXTENSION_JOB = 
"_falcon_extension_job=";
-    private static final String ASCENDING_SORT_ORDER = "asc";
-    private static final String DESCENDING_SORT_ORDER = "desc";
-
-    private Extension extension = new Extension();
-
-    private static final String EXTENSION_RESULTS = "extensions";
-    private static final String TOTAL_RESULTS = "totalResults";
-    private static final String README = "README";
-    private static final String NAME = "name";
-    private static final String EXTENSION_TYPE = "type";
-    private static final String EXTENSION_DESC = "description";
-    private static final String EXTENSION_LOCATION = "location";
-    private static final String JOB_NAME = "jobName";
-
-    private static final String EXTENSION_NAME = "extensionName";
-    private static final String FEEDS = "feeds";
-    private static final String PROCESSES = "processes";
-    private static final String CONFIG  = "config";
-    private static final String CREATION_TIME  = "creationTime";
-    private static final String LAST_UPDATE_TIME  = "lastUpdatedTime";
-
-
-    private static final String EXTENSION_PROPERTY_JSON_SUFFIX = 
"-properties.json";
-    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
-    @GET
-    @Path("list/{extension-name}")
-    @Produces({MediaType.TEXT_XML, MediaType.APPLICATION_JSON})
-    public ExtensionJobList getExtensionJobs(
-            @PathParam("extension-name") String extensionName,
-            @DefaultValue("") @QueryParam("fields") String fields,
-            @DefaultValue(ASCENDING_SORT_ORDER) @QueryParam("sortOrder") 
String sortOrder,
-            @DefaultValue("0") @QueryParam("offset") Integer offset,
-            @QueryParam("numResults") Integer resultsPerPage,
-            @DefaultValue("") @QueryParam("doAs") String doAsUser) {
-        checkIfExtensionServiceIsEnabled();
-        resultsPerPage = resultsPerPage == null ? getDefaultResultsPerPage() : 
resultsPerPage;
-        try {
-            // get filtered entities
-            List<Entity> entities = getEntityList("", "", "", 
TAG_PREFIX_EXTENSION_NAME + extensionName, "", doAsUser);
-            if (entities.isEmpty()) {
-                return new ExtensionJobList(0);
-            }
-
-            // group entities by extension job name
-            Map<String, List<Entity>> groupedEntities = 
groupEntitiesByJob(entities);
-
-            // sort by extension job name
-            List<String> jobNames = new ArrayList<>(groupedEntities.keySet());
-            switch (sortOrder.toLowerCase()) {
-            case DESCENDING_SORT_ORDER :
-                Collections.sort(jobNames, 
Collections.reverseOrder(String.CASE_INSENSITIVE_ORDER));
-                break;
-            default:
-                Collections.sort(jobNames, String.CASE_INSENSITIVE_ORDER);
-            }
-
-            // pagination and format output
-            int pageCount = getRequiredNumberOfResults(jobNames.size(), 
offset, resultsPerPage);
-            HashSet<String> fieldSet = new 
HashSet<>(Arrays.asList(fields.toUpperCase().split(",")));
-            ExtensionJobList jobList = new ExtensionJobList(pageCount);
-            for (int i = offset; i < offset + pageCount; i++) {
-                String jobName = jobNames.get(i);
-                List<Entity> jobEntities = groupedEntities.get(jobName);
-                EntityList entityList = new 
EntityList(buildEntityElements(fieldSet, jobEntities), jobEntities.size());
-                jobList.addJob(new ExtensionJobList.JobElement(jobName, 
entityList));
-            }
-            return jobList;
-        } catch (FalconException | IOException e) {
-            LOG.error("Failed to get extension job list of " + extensionName + 
": ", e);
-            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
-        }
-    }
-
-    @GET
-    @Path("instances/{job-name}")
-    @Produces(MediaType.APPLICATION_JSON)
-    public ExtensionInstanceList getInstances(
-            @PathParam("job-name") final String jobName,
-            @QueryParam("start") final String nominalStart,
-            @QueryParam("end") final String nominalEnd,
-            @DefaultValue("") @QueryParam("instanceStatus") String 
instanceStatus,
-            @DefaultValue("") @QueryParam("fields") String fields,
-            @DefaultValue("") @QueryParam("orderBy") String orderBy,
-            @DefaultValue("") @QueryParam("sortOrder") String sortOrder,
-            @DefaultValue("0") @QueryParam("offset") final Integer offset,
-            @QueryParam("numResults") Integer resultsPerPage,
-            @DefaultValue("") @QueryParam("doAs") String doAsUser) {
-        checkIfExtensionServiceIsEnabled();
-        resultsPerPage = resultsPerPage == null ? getDefaultResultsPerPage() : 
resultsPerPage;
-        try {
-            List<Entity> entities = getEntityList("", "", "", 
TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser);
-            if (entities.isEmpty()) {
-                return new ExtensionInstanceList(0);
-            }
-
-            HashSet<String> fieldSet = new 
HashSet<>(Arrays.asList(fields.toUpperCase().split(",")));
-            ExtensionInstanceList instances = new 
ExtensionInstanceList(entities.size());
-            for (Entity entity : entities) {
-                InstancesResult entityInstances = super.getStatus(
-                        entity.getEntityType().name(), entity.getName(), 
nominalStart, nominalEnd,
-                        null, null, "STATUS:" + instanceStatus, orderBy, 
sortOrder, offset, resultsPerPage, null);
-                instances.addEntitySummary(new 
ExtensionInstanceList.EntitySummary(
-                        getEntityElement(entity, fieldSet), 
entityInstances.getInstances()));
-            }
-            return instances;
-        } catch (FalconException | IOException e) {
-            LOG.error("Error when listing instances of extension job: " + 
jobName + ": ", e);
-            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
-        }
-    }
-
-    @POST
-    @Path("schedule/{job-name}")
-    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
-    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, 
MediaType.APPLICATION_JSON})
-    public APIResult schedule(@PathParam("job-name") String jobName,
-                              @DefaultValue("") @QueryParam("doAs") String 
doAsUser) {
-        checkIfExtensionServiceIsEnabled();
-        try {
-            List<Entity> entities = getEntityList("", "", "", 
TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser);
-            if (entities.isEmpty()) {
-                // return failure if the extension job doesn't exist
-                return new APIResult(APIResult.Status.FAILED, "Extension job " 
+ jobName + " doesn't exist.");
-            }
-
-            for (Entity entity : entities) {
-                scheduleInternal(entity.getEntityType().name(), 
entity.getName(), null, null);
-            }
-        } catch (FalconException | IOException e) {
-            LOG.error("Error when scheduling extension job: " + jobName + ": 
", e);
-            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
-        }
-        return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + 
jobName + " scheduled successfully");
-    }
-
-    @POST
-    @Path("suspend/{job-name}")
-    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
-    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, 
MediaType.APPLICATION_JSON})
-    public APIResult suspend(@PathParam("job-name") String jobName,
-                             @DefaultValue("") @QueryParam("doAs") String 
doAsUser) {
-        checkIfExtensionServiceIsEnabled();
-        try {
-            List<Entity> entities = getEntityList("", "", "", 
TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser);
-            if (entities.isEmpty()) {
-                // return failure if the extension job doesn't exist
-                return new APIResult(APIResult.Status.FAILED, "Extension job " 
+ jobName + " doesn't exist.");
-            }
-
-            for (Entity entity : entities) {
-                if (entity.getEntityType().isSchedulable()) {
-                    if (getWorkflowEngine(entity).isActive(entity)) {
-                        getWorkflowEngine(entity).suspend(entity);
-                    }
-                }
-            }
-        } catch (FalconException | IOException e) {
-            LOG.error("Error when scheduling extension job: " + jobName + ": 
", e);
-            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
-        }
-        return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + 
jobName + " suspended successfully");
-    }
-
-    @POST
-    @Path("resume/{job-name}")
-    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
-    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, 
MediaType.APPLICATION_JSON})
-    public APIResult resume(@PathParam("job-name") String jobName,
-                            @DefaultValue("") @QueryParam("doAs") String 
doAsUser) {
-        checkIfExtensionServiceIsEnabled();
-        try {
-            List<Entity> entities = getEntityList("", "", "", 
TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser);
-            if (entities.isEmpty()) {
-                // return failure if the extension job doesn't exist
-                return new APIResult(APIResult.Status.FAILED, "Extension job " 
+ jobName + " doesn't exist.");
-            }
-
-            for (Entity entity : entities) {
-                if (entity.getEntityType().isSchedulable()) {
-                    if (getWorkflowEngine(entity).isSuspended(entity)) {
-                        getWorkflowEngine(entity).resume(entity);
-                    }
-                }
-            }
-        } catch (FalconException | IOException e) {
-            LOG.error("Error when resuming extension job " + jobName + ": ", 
e);
-            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
-        }
-        return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + 
jobName + " resumed successfully");
-    }
-
-    @POST
-    @Path("delete/{job-name}")
-    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
-    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, 
MediaType.APPLICATION_JSON})
-    public APIResult delete(@PathParam("job-name") String jobName,
-                            @DefaultValue("") @QueryParam("doAs") String 
doAsUser) {
-        checkIfExtensionServiceIsEnabled();
-        try {
-            List<Entity> entities = getEntityList("", "", "", 
TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser);
-            if (entities.isEmpty()) {
-                // return failure if the extension job doesn't exist
-                return new APIResult(APIResult.Status.SUCCEEDED,
-                        "Extension job " + jobName + " doesn't exist. Nothing 
to delete.");
-            }
-
-            for (Entity entity : entities) {
-                // TODO(yzheng): need to remember the entity dependency graph 
for clean ordered removal
-                canRemove(entity);
-                if (entity.getEntityType().isSchedulable() && 
!DeploymentUtil.isPrism()) {
-                    getWorkflowEngine(entity).delete(entity);
-                }
-                configStore.remove(entity.getEntityType(), entity.getName());
-            }
-        } catch (FalconException | IOException e) {
-            LOG.error("Error when deleting extension job: " + jobName + ": ", 
e);
-            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
-        }
-        return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + 
jobName + " deleted successfully");
-    }
-
-    @POST
-    @Path("submit/{extension-name}")
-    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, 
MediaType.MULTIPART_FORM_DATA,
-            MediaType.APPLICATION_OCTET_STREAM})
-    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, 
MediaType.APPLICATION_JSON})
-    public APIResult submit(
-            @PathParam("extension-name") String extensionName,
-            @Context HttpServletRequest request,
-            @DefaultValue("") @QueryParam("doAs") String doAsUser,
-            @QueryParam("jobName") String jobName,
-            @FormDataParam("processes") List<FormDataBodyPart> processForms,
-            @FormDataParam("feeds") List<FormDataBodyPart> feedForms,
-            @FormDataParam("config") InputStream config) {
-        checkIfExtensionServiceIsEnabled();
-        Map<EntityType, List<Entity>> entityMap;
-
-        try {
-            entityMap = getEntityList(extensionName, jobName, feedForms, 
processForms, config);
-            submitEntities(extensionName, doAsUser, jobName, entityMap, 
config);
-        } catch (FalconException | IOException e) {
-            LOG.error("Error while submitting extension job: ", e);
-            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
-        }
-        return new APIResult(APIResult.Status.SUCCEEDED, "Extension job 
submitted successfully:" + jobName);
-    }
-
-    private Map<EntityType, List<Entity>> getEntityList(String extensionName, 
String jobName,
-                                                        List<FormDataBodyPart> 
feedForms,
-                                                        List<FormDataBodyPart> 
processForms, InputStream config)
-        throws FalconException, IOException{
-        List<Entity> processes = getProcesses(processForms);
-        List<Entity> feeds = getFeeds(feedForms);
-        ExtensionType extensionType = getExtensionType(extensionName);
-        List<Entity> entities;
-        Map<EntityType, List<Entity>> entityMap = new HashMap<>();
-        if (ExtensionType.TRUSTED.equals(extensionType)) {
-            entities = generateEntities(extensionName, config);
-            List<Entity> trustedFeeds = new ArrayList<>();
-            List<Entity> trustedProcesses = new ArrayList<>();
-            for (Entity entity : entities) {
-                if (EntityType.FEED.equals(entity.getEntityType())) {
-                    trustedFeeds.add(entity);
-                } else {
-                    trustedProcesses.add(entity);
-                }
-            }
-            entityMap.put(EntityType.PROCESS, trustedProcesses);
-            entityMap.put(EntityType.FEED, trustedFeeds);
-            return entityMap;
-        } else {
-            EntityUtil.applyTags(extensionName, jobName, processes);
-            EntityUtil.applyTags(extensionName, jobName, feeds);
-            entityMap.put(EntityType.PROCESS, processes);
-            entityMap.put(EntityType.FEED, feeds);
-            return entityMap;
-        }
-    }
-
-
-    private ExtensionType getExtensionType(String extensionName) {
-        ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
-        ExtensionBean extensionDetails = metaStore.getDetail(extensionName);
-        return extensionDetails.getExtensionType();
-    }
-
-    @POST
-    @Path("submitAndSchedule/{extension-name}")
-    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, 
MediaType.MULTIPART_FORM_DATA})
-    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, 
MediaType.APPLICATION_JSON})
-    public APIResult submitAndSchedule(
-            @PathParam("extension-name") String extensionName,
-            @Context HttpServletRequest request,
-            @DefaultValue("") @QueryParam("doAs") String doAsUser,
-            @QueryParam("jobName") String jobName,
-            @FormDataParam("processes") List<FormDataBodyPart> processForms,
-            @FormDataParam("feeds") List<FormDataBodyPart> feedForms,
-            @FormDataParam("config") InputStream config) {
-        checkIfExtensionServiceIsEnabled();
-        Map<EntityType, List<Entity>> entityMap;
-        try {
-            entityMap = getEntityList(extensionName, jobName, feedForms, 
processForms, config);
-            submitEntities(extensionName, doAsUser, jobName, entityMap, 
config);
-            scheduleEntities(entityMap);
-        } catch (FalconException | IOException e) {
-            LOG.error("Error while submitting extension job: ", e);
-            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
-        }
-        return new APIResult(APIResult.Status.SUCCEEDED, "Extension job 
submitted and scheduled successfully");
-    }
-
-    private List<Entity> getFeeds(List<FormDataBodyPart> feedForms) {
-        List<Entity> feeds = new ArrayList<>();
-        if (feedForms != null && !feedForms.isEmpty()) {
-            for (FormDataBodyPart formDataBodyPart : feedForms) {
-                feeds.add(formDataBodyPart.getValueAs(Feed.class));
-            }
-        }
-        return feeds;
-    }
-
-    private List<Entity> getProcesses(List<FormDataBodyPart> processForms) {
-        List<Entity> processes = new ArrayList<>();
-        if (processForms != null && !processForms.isEmpty()) {
-            for (FormDataBodyPart formDataBodyPart : processForms) {
-                processes.add(formDataBodyPart.getValueAs(Process.class));
-            }
-        }
-        return processes;
-    }
-
-    protected void submitEntities(String extensionName, String doAsUser, 
String jobName,
-                                  Map<EntityType, List<Entity>> entityMap, 
InputStream configStream)
-        throws FalconException, IOException {
-        List<Entity> feeds = entityMap.get(EntityType.FEED);
-        List<Entity> processes = entityMap.get(EntityType.PROCESS);
-        validateFeeds(feeds);
-        validateProcesses(processes);
-        List<String> feedNames = new ArrayList<>();
-        List<String> processNames = new ArrayList<>();
-        for (Entity feed : feeds) {
-            submitInternal(feed, doAsUser);
-            feedNames.add(feed.getName());
-        }
-        for (Entity process: processes) {
-            submitInternal(process, doAsUser);
-            processNames.add(process.getName());
-        }
-
-        ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
-        byte[] configBytes = null;
-        if (configStream != null) {
-            configBytes = IOUtils.toByteArray(configStream);
-        }
-        metaStore.storeExtensionJob(jobName, extensionName, feedNames, 
processNames, configBytes);
-    }
-
-    protected void scheduleEntities(Map<EntityType, List<Entity>> entityMap) 
throws FalconException,
-            AuthorizationException {
-        for (Object feed: entityMap.get(EntityType.FEED)) {
-            scheduleInternal(EntityType.FEED.name(), ((Feed)feed).getName(), 
null, null);
-        }
-        for (Object process: entityMap.get(EntityType.PROCESS)) {
-            scheduleInternal(EntityType.PROCESS.name(), 
((Process)process).getName(), null, null);
-        }
-    }
-
-
-    private void validateFeeds(List<Entity> feeds) throws FalconException {
-        for (Entity feed : feeds) {
-            super.validate(feed);
-        }
-    }
-
-    private void validateProcesses(List<Entity> processes) throws 
FalconException {
-        ProcessEntityParser processEntityParser = new ProcessEntityParser();
-        for (Entity process : processes) {
-            processEntityParser.validate((Process)process, false);
-        }
-    }
-
-    @POST
-    @Path("update/{extension-name}")
-    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
-    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, 
MediaType.APPLICATION_JSON})
-    public APIResult update(
-            @PathParam("extension-name") String extensionName,
-            @Context HttpServletRequest request,
-            @DefaultValue("") @QueryParam("doAs") String doAsUser) {
-        checkIfExtensionServiceIsEnabled();
-        try {
-            List<Entity> entities = generateEntities(extensionName, 
request.getInputStream());
-            for (Entity entity : entities) {
-                super.update(entity, entity.getEntityType().name(), 
entity.getName(), null);
-            }
-        } catch (FalconException | IOException e) {
-            LOG.error("Error when updating extension job: ", e);
-            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
-        }
-        return new APIResult(APIResult.Status.SUCCEEDED, "Updated 
successfully");
-    }
-
-    @POST
-    @Path("validate/{extension-name}")
-    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
-    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, 
MediaType.APPLICATION_JSON})
-    public APIResult validate(
-            @PathParam("extension-name") String extensionName,
-            @Context HttpServletRequest request,
-            @DefaultValue("") @QueryParam("doAs") String doAsUser) {
-        checkIfExtensionServiceIsEnabled();
-        ExtensionType extensionType = getExtensionType(extensionName);
-        if (!ExtensionType.TRUSTED.equals(extensionType)) {
-            throw FalconWebException.newAPIException("Extension validation is 
supported only for trusted extensions");
-        }
-        try {
-            List<Entity> entities = generateEntities(extensionName, 
request.getInputStream());
-            for (Entity entity : entities) {
-                super.validate(entity);
-            }
-        } catch (FalconException | IOException e) {
-            LOG.error("Error when validating extension job: ", e);
-            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
-        }
-        return new APIResult(APIResult.Status.SUCCEEDED, "Validated 
successfully");
-    }
-
-
-    // Extension store related REST API's
-    @GET
-    @Path("enumerate")
-    @Produces({MediaType.APPLICATION_JSON})
-    public Response getExtensions() {
-        checkIfExtensionServiceIsEnabled();
-        JSONArray results;
-
-        try {
-            results = buildEnumerateResult();
-        } catch (StoreAccessException e) {
-            LOG.error("Failed when accessing extension store.", e);
-            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
-        } catch (FalconException e) {
-            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
-        }
-
-        try {
-            JSONObject response = new JSONObject();
-            response.put(EXTENSION_RESULTS, results);
-            response.put(TOTAL_RESULTS, results.length());
-
-            return Response.ok(response).build();
-        } catch (Throwable e) {
-            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
-        }
-    }
-
-    @GET
-    @Path("describe/{extension-name}")
-    @Produces(MediaType.TEXT_PLAIN)
-    public String getExtensionDescription(
-            @PathParam("extension-name") String extensionName) {
-        checkIfExtensionServiceIsEnabled();
-        validateExtensionName(extensionName);
-        try {
-            return ExtensionStore.get().getResource(extensionName, README);
-        } catch (Throwable e) {
-            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
-        }
-    }
-
-    @GET
-    @Path("detail/{extension-name}")
-    @Produces({MediaType.APPLICATION_JSON})
-    public Response getDetail(@PathParam("extension-name") String 
extensionName) {
-        checkIfExtensionServiceIsEnabled();
-        validateExtensionName(extensionName);
-        try {
-            return 
Response.ok(buildExtensionDetailResult(extensionName)).build();
-        } catch (Throwable e) {
-            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
-        }
-    }
-
-    @GET
-    @Path("extensionJobDetails/{job-name}")
-    @Produces({MediaType.APPLICATION_JSON})
-    public String getExtensionJobDetail(@PathParam("job-name") String jobName) 
{
-        checkIfExtensionServiceIsEnabled();
-        try {
-            return buildExtensionJobDetailResult(jobName).toString();
-        } catch (FalconException e) {
-            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
-        }
-    }
-
-    @POST
-    @Path("unregister/{extension-name}")
-    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
-    @Produces(MediaType.TEXT_PLAIN)
-    public String deleteExtensionMetadata(
-            @PathParam("extension-name") String extensionName){
-        checkIfExtensionServiceIsEnabled();
-        validateExtensionName(extensionName);
-        try {
-            return ExtensionStore.get().deleteExtension(extensionName, 
CurrentUser.getUser());
-        } catch (Throwable e) {
-            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
-        }
-    }
-
-    @POST
-    @Path("register/{extension-name}")
-    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
-    @Produces(MediaType.TEXT_PLAIN)
-    public String registerExtensionMetadata(
-            @PathParam("extension-name") String extensionName,
-            @QueryParam("path") String path,
-            @QueryParam("description") String description) {
-        checkIfExtensionServiceIsEnabled();
-        validateExtensionName(extensionName);
-        try {
-            return ExtensionStore.get().registerExtension(extensionName, path, 
description, CurrentUser.getUser());
-        } catch (Throwable e) {
-            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
-        }
-    }
-
-    @GET
-    @Path("definition/{extension-name}")
-    @Produces({MediaType.APPLICATION_JSON})
-    public String getExtensionDefinition(
-            @PathParam("extension-name") String extensionName) {
-        checkIfExtensionServiceIsEnabled();
-        validateExtensionName(extensionName);
-        try {
-            return ExtensionStore.get().getResource(extensionName,
-                    extensionName.toLowerCase() + 
EXTENSION_PROPERTY_JSON_SUFFIX);
-        } catch (Throwable e) {
-            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
-        }
-    }
-
-    private static void validateExtensionName(final String extensionName) {
-        if (StringUtils.isBlank(extensionName)) {
-            throw FalconWebException.newAPIException("Extension name is 
mandatory and shouldn't be blank",
-                    Response.Status.BAD_REQUEST);
-        }
-    }
-
-    private static JSONArray buildEnumerateResult() throws FalconException {
-        JSONArray results = new JSONArray();
-        ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
-        List<ExtensionBean> extensionBeanList = metaStore.getAllExtensions();
-        for (ExtensionBean extensionBean : extensionBeanList) {
-            JSONObject resultObject = new JSONObject();
-
-            try {
-                resultObject.put(NAME, 
extensionBean.getExtensionName().toLowerCase());
-                resultObject.put(EXTENSION_TYPE, 
extensionBean.getExtensionType());
-                resultObject.put(EXTENSION_DESC, 
extensionBean.getDescription());
-                resultObject.put(EXTENSION_LOCATION, 
extensionBean.getLocation());
-            } catch (JSONException e) {
-                throw new FalconException(e);
-            }
-            results.put(resultObject);
-
-        }
-        return results;
-    }
-
-    private List<Entity> generateEntities(String extensionName, InputStream 
configStream)
-        throws FalconException, IOException {
-        // get entities for extension job
-        Properties properties = new Properties();
-        properties.load(configStream);
-        List<Entity> entities = extension.getEntities(extensionName, 
configStream);
-
-        // add tags on extension name and job
-        String jobName = 
properties.getProperty(ExtensionProperties.JOB_NAME.getName());
-        EntityUtil.applyTags(extensionName, jobName, entities);
-
-        return entities;
-    }
-
-    private JSONObject buildExtensionJobDetailResult(final String jobName) 
throws FalconException {
-        ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
-        ExtensionJobsBean jobsBean = metaStore.getExtensionJobDetails(jobName);
-        if (jobsBean == null) {
-            throw new ValidationException("Job name not found:" + jobName);
-        }
-        JSONObject detailsObject = new JSONObject();
-        try {
-            detailsObject.put(JOB_NAME, jobsBean.getJobName());
-            detailsObject.put(EXTENSION_NAME, jobsBean.getExtensionName());
-            detailsObject.put(FEEDS, StringUtils.join(jobsBean.getFeeds(), 
","));
-            detailsObject.put(PROCESSES, 
StringUtils.join(jobsBean.getProcesses(), ","));
-            detailsObject.put(CONFIG, jobsBean.getConfig());
-            detailsObject.put(CREATION_TIME, jobsBean.getCreationTime());
-            detailsObject.put(LAST_UPDATE_TIME, jobsBean.getLastUpdatedTime());
-        } catch (JSONException e) {
-            LOG.error("Exception while building extension jon details for job 
{}", jobName, e);
-        }
-        return detailsObject;
-    }
-
-    private JSONObject buildExtensionDetailResult(final String extensionName) 
throws FalconException {
-        ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
-
-        if (!metaStore.checkIfExtensionExists(extensionName)){
-            throw new ValidationException("No extension resources found for " 
+ extensionName);
-        }
-
-        ExtensionBean bean = metaStore.getDetail(extensionName);
-        JSONObject resultObject = new JSONObject();
-        try {
-            resultObject.put(NAME, bean.getExtensionName());
-            resultObject.put(EXTENSION_TYPE, bean.getExtensionType());
-            resultObject.put(EXTENSION_DESC, bean.getDescription());
-            resultObject.put(EXTENSION_LOCATION, bean.getLocation());
-        } catch (JSONException e) {
-            LOG.error("Exception in buildDetailResults:", e);
-            throw new FalconException(e);
-        }
-        return resultObject;
-    }
-
-    private Map<String, List<Entity>> groupEntitiesByJob(List<Entity> 
entities) {
-        Map<String, List<Entity>> groupedEntities = new HashMap<>();
-        for (Entity entity : entities) {
-            String jobName = getJobNameFromTag(entity.getTags());
-            if (!groupedEntities.containsKey(jobName)) {
-                groupedEntities.put(jobName, new ArrayList<Entity>());
-            }
-            groupedEntities.get(jobName).add(entity);
-        }
-        return groupedEntities;
-    }
-
-    public static String getJobNameFromTag(String tags) {
-        int nameStart = tags.indexOf(TAG_PREFIX_EXTENSION_JOB);
-        if (nameStart == -1) {
-            return null;
-        }
-
-        nameStart = nameStart + TAG_PREFIX_EXTENSION_JOB.length();
-        int nameEnd = tags.indexOf(',', nameStart);
-        if (nameEnd == -1) {
-            nameEnd = tags.length();
-        }
-        return tags.substring(nameStart, nameEnd);
-    }
-
-    private static void checkIfExtensionServiceIsEnabled() {
-        if (!Services.get().isRegistered(ExtensionService.SERVICE_NAME)) {
-            throw FalconWebException.newAPIException(
-                    ExtensionService.SERVICE_NAME + " is not enabled.", 
Response.Status.NOT_FOUND);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cdb5404a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
----------------------------------------------------------------------
diff --git 
a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
 
b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
new file mode 100644
index 0000000..5e556a1
--- /dev/null
+++ 
b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
@@ -0,0 +1,694 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource.proxy;
+
+import com.sun.jersey.multipart.FormDataBodyPart;
+import com.sun.jersey.multipart.FormDataParam;
+import org.apache.commons.io.IOUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.FalconWebException;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.parser.ProcessEntityParser;
+import org.apache.falcon.entity.parser.ValidationException;
+import org.apache.falcon.entity.store.StoreAccessException;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.extensions.Extension;
+import org.apache.falcon.extensions.ExtensionProperties;
+import org.apache.falcon.extensions.ExtensionService;
+import org.apache.falcon.extensions.ExtensionType;
+import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
+import org.apache.falcon.extensions.store.ExtensionStore;
+import org.apache.falcon.persistence.ExtensionBean;
+import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.resource.APIResult;
+import org.apache.falcon.resource.AbstractExtensionManager;
+import org.apache.falcon.resource.EntityList;
+import org.apache.falcon.resource.ExtensionInstanceList;
+import org.apache.falcon.resource.ExtensionJobList;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.service.Services;
+import org.apache.falcon.util.DeploymentUtil;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Jersey Resource for extension job operations.
+ */
+@Path("extension")
+public class ExtensionManagerProxy extends AbstractExtensionManager {
+    public static final Logger LOG = 
LoggerFactory.getLogger(ExtensionManagerProxy.class);
+
+    private static final String TAG_PREFIX_EXTENSION_NAME = 
"_falcon_extension_name=";
+    private static final String ASCENDING_SORT_ORDER = "asc";
+    private static final String DESCENDING_SORT_ORDER = "desc";
+
+    private Extension extension = new Extension();
+
+    private static final String EXTENSION_RESULTS = "extensions";
+    private static final String TOTAL_RESULTS = "totalResults";
+    private static final String README = "README";
+    private static final String NAME = "name";
+    private static final String EXTENSION_TYPE = "type";
+    private static final String EXTENSION_DESC = "description";
+    private static final String EXTENSION_LOCATION = "location";
+
+
+
+    private static final String EXTENSION_PROPERTY_JSON_SUFFIX = 
"-properties.json";
+    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+    @GET
+    @Path("list/{extension-name}")
+    @Produces({MediaType.TEXT_XML, MediaType.APPLICATION_JSON})
+    public ExtensionJobList getExtensionJobs(
+            @PathParam("extension-name") String extensionName,
+            @DefaultValue("") @QueryParam("fields") String fields,
+            @DefaultValue(ASCENDING_SORT_ORDER) @QueryParam("sortOrder") 
String sortOrder,
+            @DefaultValue("0") @QueryParam("offset") Integer offset,
+            @QueryParam("numResults") Integer resultsPerPage,
+            @DefaultValue("") @QueryParam("doAs") String doAsUser) {
+        checkIfExtensionServiceIsEnabled();
+        resultsPerPage = resultsPerPage == null ? getDefaultResultsPerPage() : 
resultsPerPage;
+        try {
+            // get filtered entities
+            List<Entity> entities = getEntityList("", "", "", 
TAG_PREFIX_EXTENSION_NAME + extensionName, "", doAsUser);
+            if (entities.isEmpty()) {
+                return new ExtensionJobList(0);
+            }
+
+            // group entities by extension job name
+            Map<String, List<Entity>> groupedEntities = 
groupEntitiesByJob(entities);
+
+            // sort by extension job name
+            List<String> jobNames = new ArrayList<>(groupedEntities.keySet());
+            switch (sortOrder.toLowerCase()) {
+            case DESCENDING_SORT_ORDER :
+                Collections.sort(jobNames, 
Collections.reverseOrder(String.CASE_INSENSITIVE_ORDER));
+                break;
+            default:
+                Collections.sort(jobNames, String.CASE_INSENSITIVE_ORDER);
+            }
+
+            // pagination and format output
+            int pageCount = getRequiredNumberOfResults(jobNames.size(), 
offset, resultsPerPage);
+            HashSet<String> fieldSet = new 
HashSet<>(Arrays.asList(fields.toUpperCase().split(",")));
+            ExtensionJobList jobList = new ExtensionJobList(pageCount);
+            for (int i = offset; i < offset + pageCount; i++) {
+                String jobName = jobNames.get(i);
+                List<Entity> jobEntities = groupedEntities.get(jobName);
+                EntityList entityList = new 
EntityList(buildEntityElements(fieldSet, jobEntities), jobEntities.size());
+                jobList.addJob(new ExtensionJobList.JobElement(jobName, 
entityList));
+            }
+            return jobList;
+        } catch (FalconException | IOException e) {
+            LOG.error("Failed to get extension job list of " + extensionName + 
": ", e);
+            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
+        }
+    }
+
+    @GET
+    @Path("instances/{job-name}")
+    @Produces(MediaType.APPLICATION_JSON)
+    public ExtensionInstanceList getInstances(
+            @PathParam("job-name") final String jobName,
+            @QueryParam("start") final String nominalStart,
+            @QueryParam("end") final String nominalEnd,
+            @DefaultValue("") @QueryParam("instanceStatus") String 
instanceStatus,
+            @DefaultValue("") @QueryParam("fields") String fields,
+            @DefaultValue("") @QueryParam("orderBy") String orderBy,
+            @DefaultValue("") @QueryParam("sortOrder") String sortOrder,
+            @DefaultValue("0") @QueryParam("offset") final Integer offset,
+            @QueryParam("numResults") Integer resultsPerPage,
+            @DefaultValue("") @QueryParam("doAs") String doAsUser) {
+        checkIfExtensionServiceIsEnabled();
+        resultsPerPage = resultsPerPage == null ? getDefaultResultsPerPage() : 
resultsPerPage;
+        try {
+            List<Entity> entities = getEntityList("", "", "", 
TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser);
+            if (entities.isEmpty()) {
+                return new ExtensionInstanceList(0);
+            }
+
+            HashSet<String> fieldSet = new 
HashSet<>(Arrays.asList(fields.toUpperCase().split(",")));
+            ExtensionInstanceList instances = new 
ExtensionInstanceList(entities.size());
+            for (Entity entity : entities) {
+                InstancesResult entityInstances = super.getStatus(
+                        entity.getEntityType().name(), entity.getName(), 
nominalStart, nominalEnd,
+                        null, null, "STATUS:" + instanceStatus, orderBy, 
sortOrder, offset, resultsPerPage, null);
+                instances.addEntitySummary(new 
ExtensionInstanceList.EntitySummary(
+                        getEntityElement(entity, fieldSet), 
entityInstances.getInstances()));
+            }
+            return instances;
+        } catch (FalconException | IOException e) {
+            LOG.error("Error when listing instances of extension job: " + 
jobName + ": ", e);
+            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
+        }
+    }
+
+    @POST
+    @Path("schedule/{job-name}")
+    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, 
MediaType.APPLICATION_JSON})
+    public APIResult schedule(@PathParam("job-name") String jobName,
+                              @DefaultValue("") @QueryParam("doAs") String 
doAsUser) {
+        checkIfExtensionServiceIsEnabled();
+        try {
+            List<Entity> entities = getEntityList("", "", "", 
TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser);
+            if (entities.isEmpty()) {
+                // return failure if the extension job doesn't exist
+                return new APIResult(APIResult.Status.FAILED, "Extension job " 
+ jobName + " doesn't exist.");
+            }
+
+            for (Entity entity : entities) {
+                scheduleInternal(entity.getEntityType().name(), 
entity.getName(), null, null);
+            }
+        } catch (FalconException | IOException e) {
+            LOG.error("Error when scheduling extension job: " + jobName + ": 
", e);
+            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
+        }
+        return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + 
jobName + " scheduled successfully");
+    }
+
+    @POST
+    @Path("suspend/{job-name}")
+    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, 
MediaType.APPLICATION_JSON})
+    public APIResult suspend(@PathParam("job-name") String jobName,
+                             @DefaultValue("") @QueryParam("doAs") String 
doAsUser) {
+        checkIfExtensionServiceIsEnabled();
+        try {
+            List<Entity> entities = getEntityList("", "", "", 
TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser);
+            if (entities.isEmpty()) {
+                // return failure if the extension job doesn't exist
+                return new APIResult(APIResult.Status.FAILED, "Extension job " 
+ jobName + " doesn't exist.");
+            }
+
+            for (Entity entity : entities) {
+                if (entity.getEntityType().isSchedulable()) {
+                    if (getWorkflowEngine(entity).isActive(entity)) {
+                        getWorkflowEngine(entity).suspend(entity);
+                    }
+                }
+            }
+        } catch (FalconException | IOException e) {
+            LOG.error("Error when scheduling extension job: " + jobName + ": 
", e);
+            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
+        }
+        return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + 
jobName + " suspended successfully");
+    }
+
+    @POST
+    @Path("resume/{job-name}")
+    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, 
MediaType.APPLICATION_JSON})
+    public APIResult resume(@PathParam("job-name") String jobName,
+                            @DefaultValue("") @QueryParam("doAs") String 
doAsUser) {
+        checkIfExtensionServiceIsEnabled();
+        try {
+            List<Entity> entities = getEntityList("", "", "", 
TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser);
+            if (entities.isEmpty()) {
+                // return failure if the extension job doesn't exist
+                return new APIResult(APIResult.Status.FAILED, "Extension job " 
+ jobName + " doesn't exist.");
+            }
+
+            for (Entity entity : entities) {
+                if (entity.getEntityType().isSchedulable()) {
+                    if (getWorkflowEngine(entity).isSuspended(entity)) {
+                        getWorkflowEngine(entity).resume(entity);
+                    }
+                }
+            }
+        } catch (FalconException | IOException e) {
+            LOG.error("Error when resuming extension job " + jobName + ": ", 
e);
+            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
+        }
+        return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + 
jobName + " resumed successfully");
+    }
+
+    @POST
+    @Path("delete/{job-name}")
+    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, 
MediaType.APPLICATION_JSON})
+    public APIResult delete(@PathParam("job-name") String jobName,
+                            @DefaultValue("") @QueryParam("doAs") String 
doAsUser) {
+        checkIfExtensionServiceIsEnabled();
+        try {
+            List<Entity> entities = getEntityList("", "", "", 
TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser);
+            if (entities.isEmpty()) {
+                // return failure if the extension job doesn't exist
+                return new APIResult(APIResult.Status.SUCCEEDED,
+                        "Extension job " + jobName + " doesn't exist. Nothing 
to delete.");
+            }
+
+            for (Entity entity : entities) {
+                // TODO(yzheng): need to remember the entity dependency graph 
for clean ordered removal
+                canRemove(entity);
+                if (entity.getEntityType().isSchedulable() && 
!DeploymentUtil.isPrism()) {
+                    getWorkflowEngine(entity).delete(entity);
+                }
+                configStore.remove(entity.getEntityType(), entity.getName());
+            }
+        } catch (FalconException | IOException e) {
+            LOG.error("Error when deleting extension job: " + jobName + ": ", 
e);
+            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
+        }
+        return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + 
jobName + " deleted successfully");
+    }
+
+    @POST
+    @Path("submit/{extension-name}")
+    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, 
MediaType.MULTIPART_FORM_DATA,
+            MediaType.APPLICATION_OCTET_STREAM})
+    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, 
MediaType.APPLICATION_JSON})
+    public APIResult submit(
+            @PathParam("extension-name") String extensionName,
+            @Context HttpServletRequest request,
+            @DefaultValue("") @QueryParam("doAs") String doAsUser,
+            @QueryParam("jobName") String jobName,
+            @FormDataParam("processes") List<FormDataBodyPart> processForms,
+            @FormDataParam("feeds") List<FormDataBodyPart> feedForms,
+            @FormDataParam("config") InputStream config) {
+        checkIfExtensionServiceIsEnabled();
+        Map<EntityType, List<Entity>> entityMap;
+
+        try {
+            entityMap = getEntityList(extensionName, jobName, feedForms, 
processForms, config);
+            submitEntities(extensionName, doAsUser, jobName, entityMap, 
config);
+        } catch (FalconException | IOException e) {
+            LOG.error("Error while submitting extension job: ", e);
+            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
+        }
+        return new APIResult(APIResult.Status.SUCCEEDED, "Extension job 
submitted successfully:" + jobName);
+    }
+
+    private Map<EntityType, List<Entity>> getEntityList(String extensionName, 
String jobName,
+                                                        List<FormDataBodyPart> 
feedForms,
+                                                        List<FormDataBodyPart> 
processForms, InputStream config)
+        throws FalconException, IOException{
+        List<Entity> processes = getProcesses(processForms);
+        List<Entity> feeds = getFeeds(feedForms);
+        ExtensionType extensionType = getExtensionType(extensionName);
+        List<Entity> entities;
+        Map<EntityType, List<Entity>> entityMap = new HashMap<>();
+        if (ExtensionType.TRUSTED.equals(extensionType)) {
+            entities = generateEntities(extensionName, config);
+            List<Entity> trustedFeeds = new ArrayList<>();
+            List<Entity> trustedProcesses = new ArrayList<>();
+            for (Entity entity : entities) {
+                if (EntityType.FEED.equals(entity.getEntityType())) {
+                    trustedFeeds.add(entity);
+                } else {
+                    trustedProcesses.add(entity);
+                }
+            }
+            entityMap.put(EntityType.PROCESS, trustedProcesses);
+            entityMap.put(EntityType.FEED, trustedFeeds);
+            return entityMap;
+        } else {
+            EntityUtil.applyTags(extensionName, jobName, processes);
+            EntityUtil.applyTags(extensionName, jobName, feeds);
+            entityMap.put(EntityType.PROCESS, processes);
+            entityMap.put(EntityType.FEED, feeds);
+            return entityMap;
+        }
+    }
+
+
+    private ExtensionType getExtensionType(String extensionName) {
+        ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+        ExtensionBean extensionDetails = metaStore.getDetail(extensionName);
+        return extensionDetails.getExtensionType();
+    }
+
+    @POST
+    @Path("submitAndSchedule/{extension-name}")
+    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, 
MediaType.MULTIPART_FORM_DATA})
+    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, 
MediaType.APPLICATION_JSON})
+    public APIResult submitAndSchedule(
+            @PathParam("extension-name") String extensionName,
+            @Context HttpServletRequest request,
+            @DefaultValue("") @QueryParam("doAs") String doAsUser,
+            @QueryParam("jobName") String jobName,
+            @FormDataParam("processes") List<FormDataBodyPart> processForms,
+            @FormDataParam("feeds") List<FormDataBodyPart> feedForms,
+            @FormDataParam("config") InputStream config) {
+        checkIfExtensionServiceIsEnabled();
+        Map<EntityType, List<Entity>> entityMap;
+        try {
+            entityMap = getEntityList(extensionName, jobName, feedForms, 
processForms, config);
+            submitEntities(extensionName, doAsUser, jobName, entityMap, 
config);
+            scheduleEntities(entityMap);
+        } catch (FalconException | IOException e) {
+            LOG.error("Error while submitting extension job: ", e);
+            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
+        }
+        return new APIResult(APIResult.Status.SUCCEEDED, "Extension job 
submitted and scheduled successfully");
+    }
+
+    private List<Entity> getFeeds(List<FormDataBodyPart> feedForms) {
+        List<Entity> feeds = new ArrayList<>();
+        if (feedForms != null && !feedForms.isEmpty()) {
+            for (FormDataBodyPart formDataBodyPart : feedForms) {
+                feeds.add(formDataBodyPart.getValueAs(Feed.class));
+            }
+        }
+        return feeds;
+    }
+
+    private List<Entity> getProcesses(List<FormDataBodyPart> processForms) {
+        List<Entity> processes = new ArrayList<>();
+        if (processForms != null && !processForms.isEmpty()) {
+            for (FormDataBodyPart formDataBodyPart : processForms) {
+                processes.add(formDataBodyPart.getValueAs(Process.class));
+            }
+        }
+        return processes;
+    }
+
+    protected void submitEntities(String extensionName, String doAsUser, 
String jobName,
+                                  Map<EntityType, List<Entity>> entityMap, 
InputStream configStream)
+        throws FalconException, IOException {
+        List<Entity> feeds = entityMap.get(EntityType.FEED);
+        List<Entity> processes = entityMap.get(EntityType.PROCESS);
+        validateFeeds(feeds);
+        validateProcesses(processes);
+        List<String> feedNames = new ArrayList<>();
+        List<String> processNames = new ArrayList<>();
+        for (Entity feed : feeds) {
+            submitInternal(feed, doAsUser);
+            feedNames.add(feed.getName());
+        }
+        for (Entity process: processes) {
+            submitInternal(process, doAsUser);
+            processNames.add(process.getName());
+        }
+
+        ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+        byte[] configBytes = null;
+        if (configStream != null) {
+            configBytes = IOUtils.toByteArray(configStream);
+        }
+        metaStore.storeExtensionJob(jobName, extensionName, feedNames, 
processNames, configBytes);
+    }
+
+    protected void scheduleEntities(Map<EntityType, List<Entity>> entityMap) 
throws FalconException,
+            AuthorizationException {
+        for (Object feed: entityMap.get(EntityType.FEED)) {
+            scheduleInternal(EntityType.FEED.name(), ((Feed)feed).getName(), 
null, null);
+        }
+        for (Object process: entityMap.get(EntityType.PROCESS)) {
+            scheduleInternal(EntityType.PROCESS.name(), 
((Process)process).getName(), null, null);
+        }
+    }
+
+
+    private void validateFeeds(List<Entity> feeds) throws FalconException {
+        for (Entity feed : feeds) {
+            super.validate(feed);
+        }
+    }
+
+    private void validateProcesses(List<Entity> processes) throws 
FalconException {
+        ProcessEntityParser processEntityParser = new ProcessEntityParser();
+        for (Entity process : processes) {
+            processEntityParser.validate((Process)process, false);
+        }
+    }
+
+    @POST
+    @Path("update/{extension-name}")
+    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, 
MediaType.APPLICATION_JSON})
+    public APIResult update(
+            @PathParam("extension-name") String extensionName,
+            @Context HttpServletRequest request,
+            @DefaultValue("") @QueryParam("doAs") String doAsUser) {
+        checkIfExtensionServiceIsEnabled();
+        try {
+            List<Entity> entities = generateEntities(extensionName, 
request.getInputStream());
+            for (Entity entity : entities) {
+                super.update(entity, entity.getEntityType().name(), 
entity.getName(), null);
+            }
+        } catch (FalconException | IOException e) {
+            LOG.error("Error when updating extension job: ", e);
+            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
+        }
+        return new APIResult(APIResult.Status.SUCCEEDED, "Updated 
successfully");
+    }
+
+    @POST
+    @Path("validate/{extension-name}")
+    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, 
MediaType.APPLICATION_JSON})
+    public APIResult validate(
+            @PathParam("extension-name") String extensionName,
+            @Context HttpServletRequest request,
+            @DefaultValue("") @QueryParam("doAs") String doAsUser) {
+        checkIfExtensionServiceIsEnabled();
+        ExtensionType extensionType = getExtensionType(extensionName);
+        if (!ExtensionType.TRUSTED.equals(extensionType)) {
+            throw FalconWebException.newAPIException("Extension validation is 
supported only for trusted extensions");
+        }
+        try {
+            List<Entity> entities = generateEntities(extensionName, 
request.getInputStream());
+            for (Entity entity : entities) {
+                super.validate(entity);
+            }
+        } catch (FalconException | IOException e) {
+            LOG.error("Error when validating extension job: ", e);
+            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
+        }
+        return new APIResult(APIResult.Status.SUCCEEDED, "Validated 
successfully");
+    }
+
+
+    // Extension store related REST API's
+    @GET
+    @Path("enumerate")
+    @Produces({MediaType.APPLICATION_JSON})
+    public Response getExtensions() {
+        checkIfExtensionServiceIsEnabled();
+        JSONArray results;
+
+        try {
+            results = buildEnumerateResult();
+        } catch (StoreAccessException e) {
+            LOG.error("Failed when accessing extension store.", e);
+            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
+        } catch (FalconException e) {
+            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
+        }
+
+        try {
+            JSONObject response = new JSONObject();
+            response.put(EXTENSION_RESULTS, results);
+            response.put(TOTAL_RESULTS, results.length());
+
+            return Response.ok(response).build();
+        } catch (Throwable e) {
+            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
+        }
+    }
+
+    @GET
+    @Path("describe/{extension-name}")
+    @Produces(MediaType.TEXT_PLAIN)
+    public String getExtensionDescription(
+            @PathParam("extension-name") String extensionName) {
+        checkIfExtensionServiceIsEnabled();
+        validateExtensionName(extensionName);
+        try {
+            return ExtensionStore.get().getResource(extensionName, README);
+        } catch (Throwable e) {
+            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
+        }
+    }
+
+    @GET
+    @Path("detail/{extension-name}")
+    @Produces({MediaType.APPLICATION_JSON})
+    public Response getDetail(@PathParam("extension-name") String 
extensionName) {
+        checkIfExtensionServiceIsEnabled();
+        validateExtensionName(extensionName);
+        try {
+            return 
Response.ok(buildExtensionDetailResult(extensionName)).build();
+        } catch (Throwable e) {
+            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
+        }
+    }
+
+    @GET
+    @Path("extensionJobDetails/{job-name}")
+    @Produces({MediaType.APPLICATION_JSON})
+    public String getExtensionJobDetail(@PathParam("job-name") String jobName) 
{
+        checkIfExtensionServiceIsEnabled();
+        try {
+            return  super.getExtensionJobDetail(jobName);
+        } catch (Throwable e) {
+            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
+        }
+    }
+
+    @POST
+    @Path("unregister/{extension-name}")
+    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+    @Produces(MediaType.TEXT_PLAIN)
+    public String deleteExtensionMetadata(
+            @PathParam("extension-name") String extensionName){
+        checkIfExtensionServiceIsEnabled();
+        try {
+            return super.deleteExtensionMetadata(extensionName);
+        } catch (Throwable e) {
+            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
+        }
+    }
+
+    @POST
+    @Path("register/{extension-name}")
+    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+    @Produces(MediaType.TEXT_PLAIN)
+    public String registerExtensionMetadata(
+            @PathParam("extension-name") String extensionName,
+            @QueryParam("path") String path,
+            @QueryParam("description") String description) {
+        checkIfExtensionServiceIsEnabled();
+        try {
+            return super.registerExtensionMetadata(extensionName, path, 
description, CurrentUser.getUser());
+        } catch (Throwable e) {
+            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
+        }
+    }
+
+    @GET
+    @Path("definition/{extension-name}")
+    @Produces({MediaType.APPLICATION_JSON})
+    public String getExtensionDefinition(
+            @PathParam("extension-name") String extensionName) {
+        checkIfExtensionServiceIsEnabled();
+        try {
+            return ExtensionStore.get().getResource(extensionName,
+                    extensionName.toLowerCase() + 
EXTENSION_PROPERTY_JSON_SUFFIX);
+        } catch (Throwable e) {
+            throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
+        }
+    }
+
+    private static JSONArray buildEnumerateResult() throws FalconException {
+        JSONArray results = new JSONArray();
+        ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+        List<ExtensionBean> extensionBeanList = metaStore.getAllExtensions();
+        for (ExtensionBean extensionBean : extensionBeanList) {
+            JSONObject resultObject = new JSONObject();
+
+            try {
+                resultObject.put(NAME, 
extensionBean.getExtensionName().toLowerCase());
+                resultObject.put(EXTENSION_TYPE, 
extensionBean.getExtensionType());
+                resultObject.put(EXTENSION_DESC, 
extensionBean.getDescription());
+                resultObject.put(EXTENSION_LOCATION, 
extensionBean.getLocation());
+            } catch (JSONException e) {
+                throw new FalconException(e);
+            }
+            results.put(resultObject);
+
+        }
+        return results;
+    }
+
+    private List<Entity> generateEntities(String extensionName, InputStream 
configStream)
+        throws FalconException, IOException {
+        // get entities for extension job
+        Properties properties = new Properties();
+        properties.load(configStream);
+        List<Entity> entities = extension.getEntities(extensionName, 
configStream);
+
+        // add tags on extension name and job
+        String jobName = 
properties.getProperty(ExtensionProperties.JOB_NAME.getName());
+        EntityUtil.applyTags(extensionName, jobName, entities);
+
+        return entities;
+    }
+
+    private JSONObject buildExtensionDetailResult(final String extensionName) 
throws FalconException {
+        ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+
+        if (!metaStore.checkIfExtensionExists(extensionName)){
+            throw new ValidationException("No extension resources found for " 
+ extensionName);
+        }
+
+        ExtensionBean bean = metaStore.getDetail(extensionName);
+        JSONObject resultObject = new JSONObject();
+        try {
+            resultObject.put(NAME, bean.getExtensionName());
+            resultObject.put(EXTENSION_TYPE, bean.getExtensionType());
+            resultObject.put(EXTENSION_DESC, bean.getDescription());
+            resultObject.put(EXTENSION_LOCATION, bean.getLocation());
+        } catch (JSONException e) {
+            LOG.error("Exception in buildDetailResults:", e);
+            throw new FalconException(e);
+        }
+        return resultObject;
+    }
+
+    private Map<String, List<Entity>> groupEntitiesByJob(List<Entity> 
entities) {
+        Map<String, List<Entity>> groupedEntities = new HashMap<>();
+        for (Entity entity : entities) {
+            String jobName = getJobNameFromTag(entity.getTags());
+            if (!groupedEntities.containsKey(jobName)) {
+                groupedEntities.put(jobName, new ArrayList<Entity>());
+            }
+            groupedEntities.get(jobName).add(entity);
+        }
+        return groupedEntities;
+    }
+
+    private static void checkIfExtensionServiceIsEnabled() {
+        if (!Services.get().isRegistered(ExtensionService.SERVICE_NAME)) {
+            throw FalconWebException.newAPIException(
+                    ExtensionService.SERVICE_NAME + " is not enabled.", 
Response.Status.NOT_FOUND);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cdb5404a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
----------------------------------------------------------------------
diff --git 
a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
 
b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
index 316567e..26de20e 100644
--- 
a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
+++ 
b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
@@ -39,7 +39,7 @@ import org.apache.falcon.resource.FeedLookupResult;
 import org.apache.falcon.resource.SchedulableEntityInstanceResult;
 import org.apache.falcon.resource.channel.Channel;
 import org.apache.falcon.resource.channel.ChannelFactory;
-import org.apache.falcon.resource.extensions.ExtensionManager;
+import org.apache.falcon.resource.AbstractExtensionManager;
 import org.apache.falcon.util.DeploymentUtil;
 
 import javax.servlet.http.HttpServletRequest;
@@ -403,7 +403,7 @@ public class SchedulableEntityManagerProxy extends 
AbstractSchedulableEntityMana
     private void entityHasExtensionJobTag(Entity entity) {
         String tags = entity.getTags();
         if (StringUtils.isNotBlank(tags)) {
-            String jobName = ExtensionManager.getJobNameFromTag(tags);
+            String jobName = AbstractExtensionManager.getJobNameFromTag(tags);
             if (StringUtils.isNotBlank(jobName)) {
                 throw FalconWebException.newAPIException("Entity has extension 
job name in the tag. Such entities need "
                         + "to be submitted as extension jobs:" + jobName);
@@ -413,7 +413,7 @@ public class SchedulableEntityManagerProxy extends 
AbstractSchedulableEntityMana
 
     private void checkExtensionJobExist(String tags) {
         if (tags != null) {
-            String jobName = ExtensionManager.getJobNameFromTag(tags);
+            String jobName = AbstractExtensionManager.getJobNameFromTag(tags);
             ExtensionMetaStore extensionMetaStore = 
ExtensionStore.getMetaStore();
             if (jobName != null && 
extensionMetaStore.checkIfExtensionJobExists(jobName)) {
                 throw FalconWebException.newAPIException("Entity operation is 
not allowed on this entity as it is"

http://git-wip-us.apache.org/repos/asf/falcon/blob/cdb5404a/prism/src/main/webapp/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git a/prism/src/main/webapp/WEB-INF/web.xml 
b/prism/src/main/webapp/WEB-INF/web.xml
index c92b757..0a4b9f0 100644
--- a/prism/src/main/webapp/WEB-INF/web.xml
+++ b/prism/src/main/webapp/WEB-INF/web.xml
@@ -94,8 +94,7 @@
             <param-name>com.sun.jersey.config.property.packages</param-name>
             <param-value>
                 
org.apache.falcon.resource.admin,org.apache.falcon.resource.provider,
-                
org.apache.falcon.resource.proxy,org.apache.falcon.resource.metadata,
-                org.apache.falcon.resource.extensions
+                
org.apache.falcon.resource.proxy,org.apache.falcon.resource.metadata
             </param-value>
         </init-param>
         <load-on-startup>1</load-on-startup>

http://git-wip-us.apache.org/repos/asf/falcon/blob/cdb5404a/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index 67347cd..fae937a 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -56,7 +56,7 @@
 #org.apache.falcon.metadata.MetadataMappingService,\
 
 ##Add if you want to use Trusted or User Extensions
-## In case of distributed Mode enable ExtensionService only on Prism
+## In case of distributed Mode enable ExtensionService only on Prism via 
prism.application.services
 ## It should come after FalconJPAService in application services
 #org.apache.falcon.extensions.ExtensionService,\
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/cdb5404a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
----------------------------------------------------------------------
diff --git 
a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java 
b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
index da486db..7f55f9a 100644
--- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
@@ -22,7 +22,8 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.resource.APIResult;
-import org.apache.falcon.resource.extensions.ExtensionManager;
+import org.apache.falcon.resource.AbstractExtensionManager;
+import org.apache.falcon.security.CurrentUser;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -32,7 +33,7 @@ import java.util.Map;
 /**
  * A proxy implementation of the extension operations in local mode.
  */
-public class LocalExtensionManager extends ExtensionManager {
+public class LocalExtensionManager extends AbstractExtensionManager {
     public LocalExtensionManager() {}
 
     public APIResult submitExtensionJob(String extensionName, String jobName, 
InputStream config,
@@ -51,7 +52,7 @@ public class LocalExtensionManager extends ExtensionManager {
 
 
     public String registerExtensionMetadata(String extensionName, String 
packagePath , String description) {
-        return super.registerExtensionMetadata(extensionName, packagePath, 
description);
+        return super.registerExtensionMetadata(extensionName, packagePath, 
description, CurrentUser.getUser());
     }
 
     public String unRegisterExtension(String extensionName) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/cdb5404a/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java 
b/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java
new file mode 100644
index 0000000..2160320
--- /dev/null
+++ b/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.resource;
+
+import org.apache.falcon.FalconWebException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.POST;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+/**
+ * This class provides RESTful API for the extensions.
+ */
+@Path("extension")
+public class ExtensionManager {
+    public static final Logger LOG = 
LoggerFactory.getLogger(ExtensionManager.class);
+
+    @GET
+    @Path("enumerate")
+    @Produces({MediaType.APPLICATION_JSON})
+    public Response getExtensions() {
+        LOG.error("Enumerate is not supported on Server.Please run your 
operation on Prism ");
+        throw FalconWebException.newAPIException("Enumerate is not supported 
on Server. Please run your operation "
+                + "on Prism.");
+    }
+
+    @GET
+    @Path("describe/{extension-name}")
+    @Produces(MediaType.TEXT_PLAIN)
+    public String getExtensionDescription(
+            @PathParam("extension-name") String extensionName) {
+        LOG.error("Describe is not supported on Server.Please run your 
operation on Prism ");
+        throw FalconWebException.newAPIException("Describe is not supported on 
Server. Please run your operation "
+                + "on Prism.");
+    }
+
+    @GET
+    @Path("detail/{extension-name}")
+    @Produces({MediaType.APPLICATION_JSON})
+    public Response getDetail(@PathParam("extension-name") String 
extensionName) {
+        LOG.error("Detail is not supported on Server.Please run your operation 
on Prism ");
+        throw FalconWebException.newAPIException("Detail is not supported on 
Server. Please run your operation "
+                + "on Prism.");
+    }
+
+    @POST
+    @Path("unregister/{extension-name}")
+    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+    @Produces(MediaType.TEXT_PLAIN)
+    public String deleteExtensionMetadata(
+            @PathParam("extension-name") String extensionName){
+        LOG.error("Unregister is not supported on Server.Please run your 
operation on Prism ");
+        throw FalconWebException.newAPIException("Unregister is not supported 
on Server. Please run your operation "
+                + "on Prism.");
+    }
+
+    @GET
+    @Path("definition/{extension-name}")
+    @Produces({MediaType.APPLICATION_JSON})
+    public String getExtensionDefinition(
+            @PathParam("extension-name") String extensionName) {
+        LOG.error("Definition is not supported on Server.Please run your 
operation on Prism ");
+        throw FalconWebException.newAPIException("Definition is not supported 
on Server. Please run your operation "
+                + "on Prism.");
+    }
+}

Reply via email to