Repository: falcon Updated Branches: refs/heads/master 3e7e08f77 -> 2f1aa291f
FALCON-2194 Enhaced validate API to support config validation for user extensions Author: sandeep <[email protected]> Reviewers: @pallavi-rao Closes #314 from sandeepSamudrala/FALCON-2194 and squashes the following commits: 0313041 [sandeep] FALCON-2194 Incorporated Review comments 61a463c [sandeep] FALCON-2194 Enhancing validate API to support config validation for user extensions 8d58ef6 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2194 53c680b [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2194 9cf36e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon bbca081 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 48f6afa [sandeep] Merge branch 'master' of https://github.com/apache/falcon 250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon 1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon c065566 [sandeep] reverting last line changes made 1a4dcd2 [sandeep] rebased and resolved the conflicts from master 271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay. a94d4fe [sandeep] rebasing from master 9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/2f1aa291 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/2f1aa291 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/2f1aa291 Branch: refs/heads/master Commit: 2f1aa291ffd1e62c8186638e62115c0df0c393a5 Parents: 3e7e08f Author: sandeep <[email protected]> Authored: Mon Dec 12 09:51:22 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Mon Dec 12 09:51:22 2016 +0530 ---------------------------------------------------------------------- .../apache/falcon/cli/FalconExtensionCLI.java | 2 +- .../org/apache/falcon/ExtensionHandler.java | 29 +++++++ .../falcon/client/AbstractFalconClient.java | 4 +- .../org/apache/falcon/client/FalconClient.java | 79 +++++++++++--------- .../resource/extensions/ExtensionManager.java | 4 + .../apache/falcon/unit/FalconUnitClient.java | 2 +- 6 files changed, 79 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/2f1aa291/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java ---------------------------------------------------------------------- diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java index 984b6a3..57871c3 100644 --- a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java +++ b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java @@ -113,7 +113,7 @@ public class FalconExtensionCLI { } else if (optionsList.contains(FalconCLIConstants.VALIDATE_OPT)) { validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT); - result = client.validateExtensionJob(extensionName, filePath, doAsUser).getMessage(); + result = client.validateExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage(); } else if (optionsList.contains(FalconCLIConstants.SCHEDULE_OPT)) { validateRequiredParameter(jobName, JOB_NAME_OPT); result = client.scheduleExtensionJob(jobName, doAsUser).getMessage(); http://git-wip-us.apache.org/repos/asf/falcon/blob/2f1aa291/client/src/main/java/org/apache/falcon/ExtensionHandler.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/ExtensionHandler.java b/client/src/main/java/org/apache/falcon/ExtensionHandler.java index c18a7cc..8168b23 100644 --- a/client/src/main/java/org/apache/falcon/ExtensionHandler.java +++ b/client/src/main/java/org/apache/falcon/ExtensionHandler.java @@ -27,6 +27,8 @@ import org.apache.falcon.extensions.ExtensionBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +44,8 @@ import java.util.ArrayList; import java.util.List; import java.util.ServiceLoader; +import static org.apache.falcon.client.FalconClient.OUT; + /** * Handler class that is responsible for preparing Extension entities. */ @@ -50,6 +54,8 @@ public final class ExtensionHandler { public static final Logger LOG = LoggerFactory.getLogger(ExtensionHandler.class); private static final String UTF_8 = CharEncoding.UTF_8; private static final String TMP_BASE_DIR = String.format("file://%s", System.getProperty("java.io.tmpdir")); + private static final String LOCATION = "location"; + private static final String TYPE = "type"; public List<Entity> getEntities(ClassLoader extensionClassloader, String extensionName, String jobName, InputStream configStream) throws IOException, FalconException { @@ -186,4 +192,27 @@ public final class ExtensionHandler { urls.add(fileURL); return urls; } + + + public static String getExtensionLocation(String extensionName, JSONObject extensionDetailJson) { + String extensionBuildPath; + try { + extensionBuildPath = extensionDetailJson.get(LOCATION).toString(); + } catch (JSONException e) { + OUT.get().print("Error. " + extensionName + " not found "); + throw new FalconCLIException("Failed to get extension type for the given extension"); + } + return extensionBuildPath; + } + + public static String getExtensionType(String extensionName, JSONObject extensionDetailJson) { + String extensionType; + try { + extensionType = extensionDetailJson.get(TYPE).toString(); + } catch (JSONException e) { + OUT.get().print("Error. " + extensionName + " not found "); + throw new FalconCLIException("Failed to get extension type for the given extension"); + } + return extensionType; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/2f1aa291/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java index e9a10fd..9659cfc 100644 --- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java @@ -197,7 +197,7 @@ public abstract class AbstractFalconClient { public abstract String unregisterExtension(String extensionName); /** - * Prepare set of entities the extension has implemented and stage them to a local directory and submit them too. + * Prepares set of entities the extension has implemented and stage them to a local directory and submit them too. * @param extensionName extension which is available in the store. * @param jobName name to be used in all the extension entities' tagging that are built as part of * loadAndPrepare. @@ -209,7 +209,7 @@ public abstract class AbstractFalconClient { String doAsUser); /** - * Prepare set of entities the extension has implemented and stage them to a local directory and submits and + * Prepares set of entities the extension has implemented and stage them to a local directory and submits and * schedules them. * @param extensionName extension which is available in the store. * @param jobName name to be used in all the extension entities' tagging that are built as part of http://git-wip-us.apache.org/repos/asf/falcon/blob/2f1aa291/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index 9820686..7fa5330 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -23,8 +23,6 @@ import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.WebResource; import com.sun.jersey.api.client.config.DefaultClientConfig; import com.sun.jersey.client.urlconnection.HTTPSProperties; -import com.sun.jersey.core.header.FormDataContentDisposition; -import com.sun.jersey.multipart.FormDataBodyPart; import com.sun.jersey.multipart.FormDataMultiPart; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; @@ -141,7 +139,9 @@ public class FalconClient extends AbstractFalconClient { return true; } }; - private static final String TAG_SEPARATOR = ","; + private static final String FEEDS = "feeds"; + private static final String PROCESSES = "processes"; + private static final String CONFIG = "config"; private final WebResource service; private final AuthenticatedURL.Token authenticationToken; @@ -1073,16 +1073,16 @@ public class FalconClient extends AbstractFalconClient { List<Entity> entities = validateExtensionAndGetEntities(extensionName, jobName, configStream); FormDataMultiPart formDataMultiPart = new FormDataMultiPart(); - for (Entity entity : entities) { - if (EntityType.FEED.equals(entity.getEntityType())) { - formDataMultiPart.field("feeds", entity, MediaType.APPLICATION_XML_TYPE); - } else if (EntityType.PROCESS.equals(entity.getEntityType())) { - formDataMultiPart.field("processes", entity, MediaType.APPLICATION_XML_TYPE); + if (entities != null && !entities.isEmpty()) { + for (Entity entity : entities) { + if (EntityType.FEED.equals(entity.getEntityType())) { + formDataMultiPart.field(FEEDS, entity, MediaType.APPLICATION_XML_TYPE); + } else if (EntityType.PROCESS.equals(entity.getEntityType())) { + formDataMultiPart.field(PROCESSES, entity, MediaType.APPLICATION_XML_TYPE); + } } } - - formDataMultiPart.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("config").build(), configStream, - MediaType.APPLICATION_OCTET_STREAM_TYPE)); + formDataMultiPart.field(CONFIG, configStream, MediaType.APPLICATION_OCTET_STREAM_TYPE); try { formDataMultiPart.close(); } catch (IOException e) { @@ -1094,30 +1094,28 @@ public class FalconClient extends AbstractFalconClient { private List<Entity> validateExtensionAndGetEntities(String extensionName, String jobName, InputStream configStream) { - ClientResponse clientResponse = getExtensionDetailResponse(extensionName); - List<Entity> entities = getEntities(extensionName, jobName, configStream, clientResponse); + JSONObject extensionDetailJson = getExtensionDetailJson(extensionName); + String extensionType = ExtensionHandler.getExtensionType(extensionName, extensionDetailJson); + String extensionBuildLocation = ExtensionHandler.getExtensionLocation(extensionName, extensionDetailJson); + List<Entity> entities = getEntities(extensionName, jobName, configStream, extensionType, + extensionBuildLocation); return entities; } - private List<Entity> getEntities(String extensionName, String jobName, InputStream configStream, - ClientResponse clientResponse) { - JSONObject responseJson; - try { - responseJson = new JSONObject(clientResponse.getEntity(String.class)); - } catch (JSONException e) { - OUT.get().print("Submit failed. Failed to get details for the given extension"); - throw new FalconCLIException("Submit failed. Failed to get details for the given extension"); - } - String extensionType; - String extensionBuildLocation; + private JSONObject getExtensionDetailJson(String extensionName) { + ClientResponse clientResponse = getExtensionDetailResponse(extensionName); + JSONObject extensionDetailJson; try { - extensionType = responseJson.get("type").toString(); - extensionBuildLocation = responseJson.get("location").toString(); + extensionDetailJson = new JSONObject(clientResponse.getEntity(String.class)); } catch (JSONException e) { - OUT.get().print("Error. " + extensionName + " not found "); - throw new FalconCLIException("Submit failed. Failed to get details for the given extension"); + OUT.get().print("Failed to get details for the given extension"); + throw new FalconCLIException("Failed to get details for the given extension"); } + return extensionDetailJson; + } + private List<Entity> getEntities(String extensionName, String jobName, InputStream configStream, + String extensionType, String extensionBuildLocation) { List<Entity> entities = null; if (!extensionType.equals(ExtensionType.CUSTOM.name())) { try { @@ -1125,11 +1123,11 @@ public class FalconClient extends AbstractFalconClient { extensionBuildLocation); } catch (Exception e) { OUT.get().println("Error in building the extension"); - throw new FalconCLIException("Submit failed. Failed to get details for the given extension"); + throw new FalconCLIException("Failed to prepare entities for the given extension"); } if (entities == null || entities.isEmpty()) { OUT.get().println("No entities got built"); - throw new FalconCLIException("Submit failed. Failed to get details for the given extension"); + throw new FalconCLIException("Failed to prepare entities for the given extension"); } } return entities; @@ -1154,13 +1152,20 @@ public class FalconClient extends AbstractFalconClient { return getResponse(APIResult.class, clientResponse); } - public APIResult validateExtensionJob(final String extensionName, final String filePath, final String doAsUser) { - InputStream entityStream = getServletInputStream(filePath); - ClientResponse clientResponse = new ResourceBuilder() - .path(ExtensionOperations.VALIDATE.path, extensionName) - .addQueryParam(DO_AS_OPT, doAsUser) - .call(ExtensionOperations.VALIDATE, entityStream); - return getResponse(APIResult.class, clientResponse); + public APIResult validateExtensionJob(final String extensionName, final String jobName, + final String configPath, final String doAsUser) { + String extensionType = ExtensionHandler.getExtensionType(extensionName, getExtensionDetailJson(extensionName)); + InputStream configStream = getServletInputStream(configPath); + if (ExtensionType.TRUSTED.name().equalsIgnoreCase(extensionType)) { + ClientResponse clientResponse = new ResourceBuilder() + .path(ExtensionOperations.VALIDATE.path, extensionName) + .addQueryParam(DO_AS_OPT, doAsUser) + .call(ExtensionOperations.VALIDATE, configStream); + return getResponse(APIResult.class, clientResponse); + } else { + validateExtensionAndGetEntities(extensionName, jobName, configStream); + return new APIResult(APIResult.Status.SUCCEEDED, "Validated successfully"); + } } public APIResult scheduleExtensionJob(final String jobName, final String doAsUser) { http://git-wip-us.apache.org/repos/asf/falcon/blob/2f1aa291/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java index 241af31..47ee737 100644 --- a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java @@ -487,6 +487,10 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { @Context HttpServletRequest request, @DefaultValue("") @QueryParam("doAs") String doAsUser) { checkIfExtensionServiceIsEnabled(); + ExtensionType extensionType = getExtensionType(extensionName); + if (!ExtensionType.TRUSTED.equals(extensionType)) { + throw FalconWebException.newAPIException("Extension validation is supported only for trusted extensions"); + } try { List<Entity> entities = generateEntities(extensionName, request.getInputStream()); for (Entity entity : entities) { http://git-wip-us.apache.org/repos/asf/falcon/blob/2f1aa291/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java index 0eb0ab3..4da9f73 100644 --- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java @@ -316,7 +316,7 @@ public class FalconUnitClient extends AbstractFalconClient { entities = ExtensionHandler.loadAndPrepare(extensionName, jobName, configStream, packagePath); } catch (FalconException | IOException e) { - throw new FalconCLIException("Failed in generating entties" + jobName); + throw new FalconCLIException("Failed in generating entities" + jobName); } return entities; }
