Repository: falcon Updated Branches: refs/heads/master 51a09451e -> c79e5e4d3
FALCON-2201 Falcon Unit changes for extension support and falcon unit tests for extensions and fixes. Author: sandeep <[email protected]> Reviewers: @pallavi-rao Closes #308 from sandeepSamudrala/FALCON-2201 and squashes the following commits: 2bd685f [sandeep] FALCON-2201 Fixed checkstyle issues c7422e6 [sandeep] FALCON-2201 Incorporated review comments. Removed applying tags from client. I will move that to Server side in the server side changes 86446ad [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2201 432cdfd [sandeep] FALCON-2201 Incorporated review comments a0ce5e0 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2201 519a877 [sandeep] FALCON-2201 Fixed checkstyle issues c101c7b [sandeep] FALCON-2201 Incorporated review comments bf0e6ed [sandeep] FALCON-2201 Incorporated review comments and few client side changes adfd318 [sandeep] FALCON-2201 Falcon Unit changes for extension support and falcon unit tests for extensions and fixes. 03f0c3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2201 9cf36e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon bbca081 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 48f6afa [sandeep] Merge branch 'master' of https://github.com/apache/falcon 250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon 1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon c065566 [sandeep] reverting last line changes made 1a4dcd2 [sandeep] rebased and resolved the conflicts from master 271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay. a94d4fe [sandeep] rebasing from master 9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/c79e5e4d Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/c79e5e4d Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/c79e5e4d Branch: refs/heads/master Commit: c79e5e4d3a68edb021db28c68f6797c2a3641efe Parents: 51a0945 Author: sandeep <[email protected]> Authored: Fri Dec 2 16:50:42 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Fri Dec 2 16:50:42 2016 +0530 ---------------------------------------------------------------------- .../apache/falcon/cli/FalconExtensionCLI.java | 4 +- client/pom.xml | 5 + .../org/apache/falcon/ExtensionHandler.java | 4 +- .../falcon/client/AbstractFalconClient.java | 51 +++++++ .../org/apache/falcon/client/FalconClient.java | 137 +++++++++++-------- .../apache/falcon/extensions/ExtensionType.java | 2 +- .../extensions/jdbc/ExtensionMetaStore.java | 8 +- .../falcon/extensions/store/ExtensionStore.java | 14 +- .../org/apache/falcon/ExtensionExample.java | 4 +- .../falcon/extensions/ExtensionServiceTest.java | 2 +- .../store/AbstractTestExtensionStore.java | 2 +- ...rg.apache.falcon.extensions.ExtensionBuilder | 2 +- .../src/test/resources/extension-example.xml | 42 ++++++ extensions/src/test/resources/process.xml | 59 -------- .../resource/extensions/ExtensionManager.java | 13 +- .../jdbc/MonitoringJdbcStateStoreTest.java | 2 +- .../service/EntitySLAAlertServiceTest.java | 2 +- .../java/org/apache/falcon/unit/FalconUnit.java | 43 ++++-- .../apache/falcon/unit/FalconUnitClient.java | 57 +++++++- .../falcon/unit/LocalExtensionManager.java | 60 ++++++++ unit/src/main/resources/startup.properties | 4 +- .../org/apache/falcon/ExtensionHandlerTest.java | 66 +++++++++ .../org/apache/falcon/ExtensionUtilTest.java | 66 --------- .../apache/falcon/unit/FalconUnitTestBase.java | 19 +++ .../org/apache/falcon/unit/TestFalconUnit.java | 93 +++++++++++++ .../resources/extension-example-duplicate.xml | 42 ++++++ .../falcon/process/TableStorageProcessIT.java | 2 +- 27 files changed, 583 insertions(+), 222 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/c79e5e4d/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java ---------------------------------------------------------------------- diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java index 15b1b32..9b88abe 100644 --- a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java +++ b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java @@ -92,8 +92,10 @@ public class FalconExtensionCLI { }else if (optionsList.contains(DETAIL_OPT)) { validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); result = client.getExtensionDetail(extensionName); + result = prettyPrintJson(result); } else if (optionsList.contains(FalconCLIConstants.SUBMIT_OPT)) { validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); + validateRequiredParameter(jobName, JOB_NAME_OPT); validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT); result = client.submitExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage(); } else if (optionsList.contains(REGISTER_OPT)) { @@ -103,7 +105,7 @@ public class FalconExtensionCLI { } else if (optionsList.contains(FalconCLIConstants.SUBMIT_AND_SCHEDULE_OPT)) { validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT); - result = client.submitAndScheduleExtensionJob(extensionName, filePath, doAsUser).getMessage(); + result = client.submitAndScheduleExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage(); } else if (optionsList.contains(FalconCLIConstants.UPDATE_OPT)) { validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT); http://git-wip-us.apache.org/repos/asf/falcon/blob/c79e5e4d/client/pom.xml ---------------------------------------------------------------------- diff --git a/client/pom.xml b/client/pom.xml index b8647f9..53a71ef 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -82,6 +82,11 @@ </dependency> <dependency> + <groupId>com.sun.jersey.contribs</groupId> + <artifactId>jersey-multipart</artifactId> + </dependency> + + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <scope>provided</scope> http://git-wip-us.apache.org/repos/asf/falcon/blob/c79e5e4d/client/src/main/java/org/apache/falcon/ExtensionHandler.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/ExtensionHandler.java b/client/src/main/java/org/apache/falcon/ExtensionHandler.java index 80df791..c18a7cc 100644 --- a/client/src/main/java/org/apache/falcon/ExtensionHandler.java +++ b/client/src/main/java/org/apache/falcon/ExtensionHandler.java @@ -113,8 +113,8 @@ public final class ExtensionHandler { type = entity.getEntityType(); OutputStream out; try { - entityFile = new File(stagePath + File.separator + entity.getEntityType().toString() + "_" - + URLEncoder.encode(entity.getName(), UTF_8)); + entityFile = new File(new Path(stagePath + File.separator + entity.getEntityType().toString() + "_" + + URLEncoder.encode(entity.getName(), UTF_8)).toUri().toURL().getPath()); if (!entityFile.createNewFile()) { LOG.debug("Not able to stage the entities in the tmp path"); return; http://git-wip-us.apache.org/repos/asf/falcon/blob/c79e5e4d/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java index 01dd6c6..e9a10fd 100644 --- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java @@ -33,6 +33,8 @@ import org.apache.falcon.resource.SchedulableEntityInstanceResult; import org.apache.falcon.resource.TriageResult; import java.io.ByteArrayInputStream; +import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.UnsupportedEncodingException; @@ -179,6 +181,22 @@ public abstract class AbstractFalconClient { String properties); /** + * Registers an extension. + * @param extensionName extensionName of the extension. + * @param packagePath Package location for the extension. + * @param description description of the extension. + * @return Result of the registerExtension command. + */ + public abstract String registerExtension(String extensionName, String packagePath, String description); + + /** + * + * @param extensionName extensionName that needs to be unregistered + * @return Result of the unregisterExtension operation + */ + public abstract String unregisterExtension(String extensionName); + + /** * Prepare set of entities the extension has implemented and stage them to a local directory and submit them too. * @param extensionName extension which is available in the store. * @param jobName name to be used in all the extension entities' tagging that are built as part of @@ -191,6 +209,19 @@ public abstract class AbstractFalconClient { String doAsUser); /** + * Prepare set of entities the extension has implemented and stage them to a local directory and submits and + * schedules them. + * @param extensionName extension which is available in the store. + * @param jobName name to be used in all the extension entities' tagging that are built as part of + * loadAndPrepare. + * @param configPath path to extension parameters. + * @return + * @throws FalconCLIException + */ + public abstract APIResult submitAndScheduleExtensionJob(String extensionName, String jobName, String configPath, + String doAsUser); + + /** * * Get list of the entities. * We have two filtering parameters for entity tags: "tags" and "tagkeys". @@ -468,6 +499,26 @@ public abstract class AbstractFalconClient { return (buffer.length() == 0) ? null : stream; } + /** + * Converts a InputStream into ServletInputStream. + * + * @param filePath - Path of file to stream + * @return ServletInputStream + */ + protected InputStream getServletInputStream(String filePath) { + + if (filePath == null) { + return null; + } + InputStream stream; + try { + stream = new FileInputStream(filePath); + } catch (FileNotFoundException e) { + throw new FalconCLIException("File not found:", e); + } + return stream; + } + public abstract SchedulableEntityInstanceResult getFeedSlaMissPendingAlerts(String entityType, String entityName, String start, String end, String colo); http://git-wip-us.apache.org/repos/asf/falcon/blob/c79e5e4d/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index 4d4517c..9820686 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -23,13 +23,14 @@ import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.WebResource; import com.sun.jersey.api.client.config.DefaultClientConfig; import com.sun.jersey.client.urlconnection.HTTPSProperties; +import com.sun.jersey.core.header.FormDataContentDisposition; +import com.sun.jersey.multipart.FormDataBodyPart; +import com.sun.jersey.multipart.FormDataMultiPart; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.net.util.TrustManagerUtils; -import org.apache.falcon.FalconException; import org.apache.falcon.LifeCycle; import org.apache.falcon.ExtensionHandler; -import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.v0.DateValidator; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; @@ -64,8 +65,6 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.UriBuilder; import java.io.BufferedReader; -import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.io.InputStream; @@ -124,6 +123,7 @@ public class FalconClient extends AbstractFalconClient { public static final String DO_AS_OPT = "doAs"; + public static final String JOB_NAME_OPT = "jobName"; public static final String ENTITIES_OPT = "entities"; /** * Name of the HTTP cookie used for the authentication token between the client and the server. @@ -747,26 +747,6 @@ public class FalconClient extends AbstractFalconClient { return sendMetadataDiscoveryRequest(MetadataOperations.RELATIONS, dimensionType, dimensionName, null, doAsUser); } - /** - * Converts a InputStream into ServletInputStream. - * - * @param filePath - Path of file to stream - * @return ServletInputStream - */ - private InputStream getServletInputStream(String filePath) { - - if (filePath == null) { - return null; - } - InputStream stream; - try { - stream = new FileInputStream(filePath); - } catch (FileNotFoundException e) { - throw new FalconCLIException("File not found:", e); - } - return stream; - } - private <T> T getResponse(Class<T> clazz, ClientResponse clientResponse) { printClientResponse(clientResponse); checkIfSuccessful(clientResponse); @@ -872,6 +852,12 @@ public class FalconClient extends AbstractFalconClient { .accept(operation.mimeType).type(MediaType.TEXT_XML) .method(operation.method, ClientResponse.class, entityStream); } + + public ClientResponse call(ExtensionOperations submit, FormDataMultiPart formDataMultiPart) { + return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken) + .accept(submit.mimeType).type(MediaType.MULTIPART_FORM_DATA) + .method(submit.method, ClientResponse.class, formDataMultiPart); + } } public FeedLookupResult reverseLookUp(String type, String path, String doAsUser) { @@ -1040,14 +1026,17 @@ public class FalconClient extends AbstractFalconClient { } public String getExtensionDetail(final String extensionName) { - ClientResponse clientResponse = new ResourceBuilder().path(ExtensionOperations.DETAIL.path, extensionName) + return getResponse(String.class, getExtensionDetailResponse(extensionName)); + } + + public ClientResponse getExtensionDetailResponse(final String extensionName) { + return new ResourceBuilder().path(ExtensionOperations.DETAIL.path, extensionName) .call(ExtensionOperations.DETAIL); - return getResponse(String.class, clientResponse); } - public String registerExtension(final String extensionName, final String path, final String description) { + public String registerExtension(final String extensionName, final String packagePath, final String description) { ClientResponse clientResponse = new ResourceBuilder() - .path(ExtensionOperations.REGISTER.path, extensionName).addQueryParam(PATH, path) + .path(ExtensionOperations.REGISTER.path, extensionName).addQueryParam(PATH, packagePath) .addQueryParam(FalconCLIConstants.DESCRIPTION, description) .call(ExtensionOperations.REGISTER); return getResponse(String.class, clientResponse); @@ -1070,55 +1059,89 @@ public class FalconClient extends AbstractFalconClient { @Override public APIResult submitExtensionJob(final String extensionName, final String jobName, final String configPath, final String doAsUser) { + FormDataMultiPart entitiesForm = getEntitiesForm(extensionName, jobName, configPath); ClientResponse clientResponse = new ResourceBuilder() - .path(ExtensionOperations.DETAIL.path) - .call(ExtensionOperations.DETAIL); - JSONObject responseJson = clientResponse.getEntity(JSONObject.class); - ExtensionType extensionType; + .path(ExtensionOperations.SUBMIT.path, extensionName) + .addQueryParam(DO_AS_OPT, doAsUser) + .addQueryParam(JOB_NAME_OPT, jobName) + .call(ExtensionOperations.SUBMIT, entitiesForm); + return getResponse(APIResult.class, clientResponse); + } + + private FormDataMultiPart getEntitiesForm(String extensionName, String jobName, String configPath) { + InputStream configStream = getServletInputStream(configPath); + List<Entity> entities = validateExtensionAndGetEntities(extensionName, jobName, configStream); + FormDataMultiPart formDataMultiPart = new FormDataMultiPart(); + + for (Entity entity : entities) { + if (EntityType.FEED.equals(entity.getEntityType())) { + formDataMultiPart.field("feeds", entity, MediaType.APPLICATION_XML_TYPE); + } else if (EntityType.PROCESS.equals(entity.getEntityType())) { + formDataMultiPart.field("processes", entity, MediaType.APPLICATION_XML_TYPE); + } + } + + formDataMultiPart.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("config").build(), configStream, + MediaType.APPLICATION_OCTET_STREAM_TYPE)); + try { + formDataMultiPart.close(); + } catch (IOException e) { + OUT.get().print("Submit failed. Failed to submit entities"); + throw new FalconCLIException("Submit failed. Failed to submit entities", e); + } + return formDataMultiPart; + } + + private List<Entity> validateExtensionAndGetEntities(String extensionName, String jobName, + InputStream configStream) { + ClientResponse clientResponse = getExtensionDetailResponse(extensionName); + List<Entity> entities = getEntities(extensionName, jobName, configStream, clientResponse); + return entities; + } + + private List<Entity> getEntities(String extensionName, String jobName, InputStream configStream, + ClientResponse clientResponse) { + JSONObject responseJson; + try { + responseJson = new JSONObject(clientResponse.getEntity(String.class)); + } catch (JSONException e) { + OUT.get().print("Submit failed. Failed to get details for the given extension"); + throw new FalconCLIException("Submit failed. Failed to get details for the given extension"); + } + String extensionType; String extensionBuildLocation; try { - JSONObject extensionDetailsJson = new JSONObject(responseJson.get("detail").toString()); - extensionType = ExtensionType.valueOf(extensionDetailsJson.get("type").toString().toUpperCase()); - extensionBuildLocation = extensionDetailsJson.get("location").toString(); + extensionType = responseJson.get("type").toString(); + extensionBuildLocation = responseJson.get("location").toString(); } catch (JSONException e) { OUT.get().print("Error. " + extensionName + " not found "); - return null; + throw new FalconCLIException("Submit failed. Failed to get details for the given extension"); } - InputStream configStream = getServletInputStream(configPath); - List<Entity> entities; - if (extensionType.equals(ExtensionType.CUSTOM)) { + List<Entity> entities = null; + if (!extensionType.equals(ExtensionType.CUSTOM.name())) { try { - entities = ExtensionHandler.loadAndPrepare(extensionName, jobName, configStream, extensionBuildLocation); + entities = ExtensionHandler.loadAndPrepare(extensionName, jobName, configStream, + extensionBuildLocation); } catch (Exception e) { OUT.get().println("Error in building the extension"); - return null; + throw new FalconCLIException("Submit failed. Failed to get details for the given extension"); } if (entities == null || entities.isEmpty()) { OUT.get().println("No entities got built"); - return null; - } - try { - EntityUtil.applyTags(extensionName, jobName, entities); - } catch (FalconException e) { - OUT.get().println("Error in applying tags to generated entities"); + throw new FalconCLIException("Submit failed. Failed to get details for the given extension"); } } - - clientResponse = new ResourceBuilder() - .path(ExtensionOperations.SUBMIT.path, extensionName) - .addQueryParam(DO_AS_OPT, doAsUser) - .call(ExtensionOperations.SUBMIT, configStream); - return getResponse(APIResult.class, clientResponse); + return entities; } - public APIResult submitAndScheduleExtensionJob(final String extensionName, final String filePath, - final String doAsUser) { - InputStream entityStream = getServletInputStream(filePath); + public APIResult submitAndScheduleExtensionJob(final String extensionName, final String jobName, + final String configPath, final String doAsUser) { + FormDataMultiPart entitiesForm = getEntitiesForm(extensionName, jobName, configPath); ClientResponse clientResponse = new ResourceBuilder() .path(ExtensionOperations.SUBMIT_AND_SCHEDULE.path, extensionName) .addQueryParam(DO_AS_OPT, doAsUser) - .call(ExtensionOperations.SUBMIT_AND_SCHEDULE, entityStream); + .call(ExtensionOperations.SUBMIT_AND_SCHEDULE, entitiesForm); return getResponse(APIResult.class, clientResponse); } http://git-wip-us.apache.org/repos/asf/falcon/blob/c79e5e4d/common-types/src/main/java/org/apache/falcon/extensions/ExtensionType.java ---------------------------------------------------------------------- diff --git a/common-types/src/main/java/org/apache/falcon/extensions/ExtensionType.java b/common-types/src/main/java/org/apache/falcon/extensions/ExtensionType.java index c4621cc..3de37de 100644 --- a/common-types/src/main/java/org/apache/falcon/extensions/ExtensionType.java +++ b/common-types/src/main/java/org/apache/falcon/extensions/ExtensionType.java @@ -27,7 +27,7 @@ public enum ExtensionType { private final String text; - private ExtensionType(final String text) { + ExtensionType(final String text) { this.text = text; } @Override http://git-wip-us.apache.org/repos/asf/falcon/blob/c79e5e4d/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java index 5501146..456c97c 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java @@ -63,7 +63,13 @@ public class ExtensionMetaStore { beginTransaction(entityManager); Query q = entityManager.createNamedQuery(PersistenceConstants.GET_EXTENSION); q.setParameter(EXTENSION_NAME, extensionName); - if (q.getResultList().size() > 0){ + int resultSize = 0; + try { + resultSize = q.getResultList().size(); + } finally { + commitAndCloseTransaction(entityManager); + } + if (resultSize > 0){ return true; } return false; http://git-wip-us.apache.org/repos/asf/falcon/blob/c79e5e4d/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java index 832d5b7..76196b7 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java @@ -260,7 +260,7 @@ public final class ExtensionStore { if (metaStore.checkIfExtensionExists(extensionName)) { metaStore.deleteExtension(extensionName); return "Deleted extension:" + extensionName; - }else { + } else { return "Extension:" + extensionName + " is not registered with Falcon."; } } @@ -269,7 +269,7 @@ public final class ExtensionStore { throws URISyntaxException, FalconException { Configuration conf = new Configuration(); URI uri = new URI(path); - conf.set("fs.default.name", uri.getScheme() + "://" + uri.getAuthority()); + conf.set("fs.defaultFS", uri.getScheme() + "://" + uri.getAuthority()); FileSystem fileSystem = HadoopClientFactory.get().createFalconFileSystem(uri); try { fileSystem.listStatus(new Path(uri.getPath() + "/README")); @@ -284,13 +284,13 @@ public final class ExtensionStore { }; FileStatus[] jarStatus; try { - jarStatus = fileSystem.listStatus(new Path(uri.getPath() + "/libs/build"), filter); + jarStatus = fileSystem.listStatus(new Path(uri.getPath(), "libs/build"), filter); if (jarStatus.length <=0) { - throw new ValidationException("Jars are not present in the " + uri.getPath() + "libs/build."); + throw new ValidationException("Jars are not present in the " + uri.getPath() + "/libs/build."); } } catch (IOException e){ LOG.error("Exception in registering Extension:{}", extensionName, e); - throw new ValidationException("Jars are not present in the " + uri.getPath() + "libs/build."); + throw new ValidationException("Jars are not present in the " + uri.getPath() + "/libs/build."); } FileStatus[] propStatus; try{ @@ -308,9 +308,9 @@ public final class ExtensionStore { if (!metaStore.checkIfExtensionExists(extensionName)){ metaStore.storeExtensionBean(extensionName, path, ExtensionType.CUSTOM, description); }else{ - throw new ValidationException(extensionName + " already exsists."); + throw new ValidationException(extensionName + " already exists."); } - return "Extension :" + extensionName + " registered succesfully."; + return "Extension :" + extensionName + " registered successfully."; } public String getResource(final String extensionName, final String resourceName) throws StoreAccessException { http://git-wip-us.apache.org/repos/asf/falcon/blob/c79e5e4d/extensions/src/test/java/org/apache/falcon/ExtensionExample.java ---------------------------------------------------------------------- diff --git a/extensions/src/test/java/org/apache/falcon/ExtensionExample.java b/extensions/src/test/java/org/apache/falcon/ExtensionExample.java index f527f2e..432e37b 100644 --- a/extensions/src/test/java/org/apache/falcon/ExtensionExample.java +++ b/extensions/src/test/java/org/apache/falcon/ExtensionExample.java @@ -33,7 +33,7 @@ import java.util.List; */ public class ExtensionExample implements ExtensionBuilder{ - public static final String PROCESS_XML = "/process.xml"; + public static final String PROCESS_XML = "/extension-example.xml"; @Override public List<Entity> getEntities(String extensionName, InputStream extensionConfigStream) throws FalconException { @@ -42,7 +42,7 @@ public class ExtensionExample implements ExtensionBuilder{ process = (Entity) EntityType.PROCESS.getUnmarshaller().unmarshal( getClass().getResourceAsStream(PROCESS_XML)); } catch (JAXBException e) { - throw new FalconException("Failed in unmarshalling the entity"); + throw new FalconException("Failed in un-marshalling the entity"); } List<Entity> entities = new ArrayList<>(); entities.add(process); http://git-wip-us.apache.org/repos/asf/falcon/blob/c79e5e4d/extensions/src/test/java/org/apache/falcon/extensions/ExtensionServiceTest.java ---------------------------------------------------------------------- diff --git a/extensions/src/test/java/org/apache/falcon/extensions/ExtensionServiceTest.java b/extensions/src/test/java/org/apache/falcon/extensions/ExtensionServiceTest.java index a7c2fda..bbd6c18 100644 --- a/extensions/src/test/java/org/apache/falcon/extensions/ExtensionServiceTest.java +++ b/extensions/src/test/java/org/apache/falcon/extensions/ExtensionServiceTest.java @@ -49,7 +49,7 @@ public class ExtensionServiceTest extends AbstractTestExtensionStore { } @Test - public void testGetextensionStore() throws Exception { + public void testGetExtensionStore() throws Exception { Assert.assertNotNull(ExtensionService.getExtensionStore()); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/c79e5e4d/extensions/src/test/java/org/apache/falcon/extensions/store/AbstractTestExtensionStore.java ---------------------------------------------------------------------- diff --git a/extensions/src/test/java/org/apache/falcon/extensions/store/AbstractTestExtensionStore.java b/extensions/src/test/java/org/apache/falcon/extensions/store/AbstractTestExtensionStore.java index 97fe287..e3ee484 100644 --- a/extensions/src/test/java/org/apache/falcon/extensions/store/AbstractTestExtensionStore.java +++ b/extensions/src/test/java/org/apache/falcon/extensions/store/AbstractTestExtensionStore.java @@ -47,7 +47,7 @@ public class AbstractTestExtensionStore { protected ExtensionStore store; private FileSystem fileSystem; protected LocalFileSystem fs = new LocalFileSystem(); - private static final String DB_BASE_DIR = "target/test-data/persistancedb"; + private static final String DB_BASE_DIR = "target/test-data/persistenceDB"; protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql"; protected static String dbLocation = DB_BASE_DIR + File.separator + "data.db"; protected static String url = "jdbc:derby:"+ dbLocation +";create=true"; http://git-wip-us.apache.org/repos/asf/falcon/blob/c79e5e4d/extensions/src/test/resources/META-INF/services/org.apache.falcon.extensions.ExtensionBuilder ---------------------------------------------------------------------- diff --git a/extensions/src/test/resources/META-INF/services/org.apache.falcon.extensions.ExtensionBuilder b/extensions/src/test/resources/META-INF/services/org.apache.falcon.extensions.ExtensionBuilder index a8d3cf8..d109378 100644 --- a/extensions/src/test/resources/META-INF/services/org.apache.falcon.extensions.ExtensionBuilder +++ b/extensions/src/test/resources/META-INF/services/org.apache.falcon.extensions.ExtensionBuilder @@ -15,4 +15,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # - org.apache.falcon.ExtensionExample \ No newline at end of file +org.apache.falcon.ExtensionExample \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/c79e5e4d/extensions/src/test/resources/extension-example.xml ---------------------------------------------------------------------- diff --git a/extensions/src/test/resources/extension-example.xml b/extensions/src/test/resources/extension-example.xml new file mode 100644 index 0000000..4a2a982 --- /dev/null +++ b/extensions/src/test/resources/extension-example.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<process name="sample" version="0" xmlns="uri:falcon:process:0.1"> + <tags>[email protected],[email protected],_department_type=forecasting</tags> + <pipelines>testPipeline</pipelines> + <clusters> + <cluster name="local"> + <validity start="2016-12-02T00:00Z" end="2091-12-30T00:00Z"/> + </cluster> + </clusters> + <parallel>1</parallel> + <order>LIFO</order> + <frequency>hours(1)</frequency> + <sla shouldStartIn="hours(2)" shouldEndIn="hours(4)"/> + <!-- how --> + <properties> + <property name="name1" value="value1"/> + <property name="name2" value="value2"/> + </properties> + + <workflow engine="oozie" path="/app/oozie-mr"/> + + <retry policy="periodic" delay="minutes(10)" attempts="3"/> + + <notification type="email" to="falcon@localhost"/> +</process> http://git-wip-us.apache.org/repos/asf/falcon/blob/c79e5e4d/extensions/src/test/resources/process.xml ---------------------------------------------------------------------- diff --git a/extensions/src/test/resources/process.xml b/extensions/src/test/resources/process.xml deleted file mode 100644 index 48e3a16..0000000 --- a/extensions/src/test/resources/process.xml +++ /dev/null @@ -1,59 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> -<process name="sample" version="0" xmlns="uri:falcon:process:0.1"> - <tags>[email protected],[email protected],_department_type=forecasting</tags> - <pipelines>testPipeline</pipelines> - <clusters> - <cluster name="testCluster"> - <validity start="2011-11-02T00:00Z" end="2091-12-30T00:00Z"/> - </cluster> - </clusters> - <parallel>1</parallel> - <order>LIFO</order> - <frequency>hours(1)</frequency> - <sla shouldStartIn="hours(2)" shouldEndIn="hours(4)"/> - - <!-- what --> - <inputs> - <input name="impression" feed="impressionFeed" start="today(0,0)" end="today(2,0)" partition="*/US"/> - <input name="clicks" feed="clicksFeed" start="yesterday(0,0)" end="yesterday(20,0)"/> - </inputs> - - <outputs> - <output name="impOutput" feed="imp-click-join1" instance="today(0,0)"/> - <output name="clicksOutput" feed="imp-click-join2" instance="today(0,0)"/> - </outputs> - - <!-- how --> - <properties> - <property name="name1" value="value1"/> - <property name="name2" value="value2"/> - </properties> - - <workflow engine="oozie" path="/falcon/test/workflow"/> - - <retry policy="periodic" delay="minutes(10)" attempts="3"/> - - <late-process policy="exp-backoff" delay="hours(1)"> - <late-input input="impression" workflow-path="himpression/late/workflow"/> - <late-input input="clicks" workflow-path="hdfs://clicks/late/workflow"/> - </late-process> - - <notification type="email" to="falcon@localhost"/> -</process> http://git-wip-us.apache.org/repos/asf/falcon/blob/c79e5e4d/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java index cd1d4e2..79e3691 100644 --- a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java @@ -311,12 +311,12 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { LOG.error("Error when submitting extension job: ", e); throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); } - return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully"); + return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully" + jobName); } private void validateEntities(List<Entity> entities) throws FalconException { for (Entity entity : entities) { - if (!EntityType.FEED.equals(entity.getEntityType()) && !EntityType.FEED.equals(entity.getEntityType())) { + if (!EntityType.FEED.equals(entity.getEntityType()) && !EntityType.PROCESS.equals(entity.getEntityType())) { LOG.error("Cluster entity is not allowed for submission via submitEntities: {}", entity.getName()); throw new FalconException("Cluster entity is not allowed for submission in extensions submission"); } @@ -355,7 +355,7 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted and scheduled successfully"); } - private void submitEntities(String extensionName, String doAsUser, String jobName, List<Entity> entities, + protected void submitEntities(String extensionName, String doAsUser, String jobName, List<Entity> entities, InputStream configStream) throws FalconException, IOException { validateEntities(entities); List<String> feeds = new ArrayList<>(); @@ -369,7 +369,10 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { } } ExtensionMetaStore metaStore = ExtensionStore.get().getMetaStore(); - byte[] configBytes = IOUtils.toByteArray(configStream); + byte[] configBytes = null; + if (configStream != null) { + configBytes = IOUtils.toByteArray(configStream); + } metaStore.storeExtensionJob(jobName, extensionName, feeds, processes, configBytes); } @@ -570,7 +573,7 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { return entities; } - private JSONObject buildDetailResult(final String extensionName) throws FalconException { + private JSONObject buildDetailResult(final String extensionName) throws FalconException { ExtensionMetaStore metaStore = ExtensionStore.get().getMetaStore(); if (!metaStore.checkIfExtensionExists(extensionName)){ http://git-wip-us.apache.org/repos/asf/falcon/blob/c79e5e4d/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java index 860fbfc..58a680e 100644 --- a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java +++ b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java @@ -48,7 +48,7 @@ import static org.apache.falcon.util.DateUtil.now; * */ public class MonitoringJdbcStateStoreTest extends AbstractTestBase { - private static final String DB_BASE_DIR = "target/test-data/persistancedb"; + private static final String DB_BASE_DIR = "target/test-data/persistenceDB"; protected static String dbLocation = DB_BASE_DIR + File.separator + "data.db"; protected static String url = "jdbc:derby:"+ dbLocation +";create=true"; protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql"; http://git-wip-us.apache.org/repos/asf/falcon/blob/c79e5e4d/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java b/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java index 347e39a..784f3e9 100644 --- a/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java +++ b/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java @@ -48,7 +48,7 @@ import java.util.Date; * Test for SLA Alerts. */ public class EntitySLAAlertServiceTest extends AbstractTestBase { - private static final String DB_BASE_DIR = "target/test-data/persistancedb"; + private static final String DB_BASE_DIR = "target/test-data/persistencedb"; protected static String dbLocation = DB_BASE_DIR + File.separator + "data.db"; protected static String url = "jdbc:derby:"+ dbLocation +";create=true"; protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql"; http://git-wip-us.apache.org/repos/asf/falcon/blob/c79e5e4d/unit/src/main/java/org/apache/falcon/unit/FalconUnit.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnit.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnit.java index e762b31..4a2ed62 100644 --- a/unit/src/main/java/org/apache/falcon/unit/FalconUnit.java +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnit.java @@ -22,9 +22,12 @@ import org.apache.falcon.FalconException; import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.hadoop.JailedFileSystem; import org.apache.falcon.security.CurrentUser; +import org.apache.falcon.service.FalconJPAService; import org.apache.falcon.service.ServiceInitializer; +import org.apache.falcon.tools.FalconStateStoreDBCLI; import org.apache.falcon.util.RuntimeProperties; import org.apache.falcon.util.StartupProperties; +import org.apache.falcon.util.StateStoreProperties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; @@ -40,6 +43,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.net.URI; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -63,11 +67,17 @@ public final class FalconUnit { private static boolean isLocalMode; private static boolean isFalconUnitActive = false; + private static final String DB_BASE_DIR = "target/test-data/persistenceDB"; + protected static final String DB_LOCATION = DB_BASE_DIR + File.separator + "data.db"; + protected static final String URL = "jdbc:derby:"+ DB_LOCATION +";create=true"; + protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql"; + protected static final LocalFileSystem LOCAL_FS = new LocalFileSystem(); + private FalconUnit() { } - public static synchronized void start(boolean isLocal) throws FalconException, IOException { + public static synchronized void start(boolean isLocal) throws Exception { if (isFalconUnitActive) { throw new IllegalStateException("Falcon Unit is already initialized"); } @@ -79,6 +89,7 @@ public final class FalconUnit { LOG.info("Initializing runtime properties ..."); RuntimeProperties.get(); + setupExtensionConfigs(); //Initializing Services STARTUP_SERVICES.initialize(); ConfigurationStore.get(); @@ -88,7 +99,24 @@ public final class FalconUnit { initFileSystem(); } isFalconUnitActive = true; + } + + public static void setupExtensionConfigs() throws Exception { + String configPath = new URI(StartupProperties.get().getProperty("config.store.uri")).getPath(); + String location = configPath + "-extensionSore"; + StartupProperties.get().setProperty("config.store.uri", location); + FileUtils.deleteDirectory(new File(location)); + StateStoreProperties.get().setProperty(FalconJPAService.URL, URL); + Configuration localConf = new Configuration(); + LOCAL_FS.initialize(LocalFileSystem.getDefaultUri(localConf), localConf); + LOCAL_FS.mkdirs(new Path(DB_BASE_DIR)); + createDB(DB_SQL_FILE); + } + public static void createDB(String file) { + File sqlFile = new File(file); + String[] argsCreate = {"create", "-sqlfile", sqlFile.getAbsolutePath(), "-run"}; + new FalconStateStoreDBCLI().run(argsCreate); } private static void initFileSystem() throws IOException { @@ -106,13 +134,12 @@ public final class FalconUnit { String oozieLogsDir = oozieHomeDir + "/logs"; String oozieDataDir = oozieHomeDir + "/data"; - LocalFileSystem fs = new LocalFileSystem(); - fs.mkdirs(new Path(oozieHomeDir)); - fs.mkdirs(new Path(oozieConfDir)); - fs.mkdirs(new Path(oozieHadoopConfDir)); - fs.mkdirs(new Path(oozieActionConfDir)); - fs.mkdirs(new Path(oozieLogsDir)); - fs.close(); + LOCAL_FS.mkdirs(new Path(oozieHomeDir)); + LOCAL_FS.mkdirs(new Path(oozieConfDir)); + LOCAL_FS.mkdirs(new Path(oozieHadoopConfDir)); + LOCAL_FS.mkdirs(new Path(oozieActionConfDir)); + LOCAL_FS.mkdirs(new Path(oozieLogsDir)); + LOCAL_FS.close(); setSystemProperty("oozie.home.dir", oozieHomeDir); setSystemProperty("oozie.data.dir", oozieDataDir); http://git-wip-us.apache.org/repos/asf/falcon/blob/c79e5e4d/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java index 7248964..00c2ad1 100644 --- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java @@ -18,6 +18,7 @@ package org.apache.falcon.unit; import org.apache.commons.lang.StringUtils; +import org.apache.falcon.ExtensionHandler; import org.apache.falcon.FalconException; import org.apache.falcon.LifeCycle; import org.apache.falcon.client.AbstractFalconClient; @@ -31,6 +32,7 @@ import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Cluster; import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.entity.v0.process.Validity; +import org.apache.falcon.extensions.store.ExtensionStore; import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.EntityList; import org.apache.falcon.resource.EntitySummaryResult; @@ -52,6 +54,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Date; @@ -69,13 +72,14 @@ public class FalconUnitClient extends AbstractFalconClient { private static final Logger LOG = LoggerFactory.getLogger(FalconUnitClient.class); protected static final int XML_DEBUG_LEN = 10 * 1024; - private static final String DEFAULT_ORDERBY = "status"; + private static final String DEFAULT_ORDER_BY = "status"; private static final String DEFAULT_SORTED_ORDER = "asc"; protected ConfigurationStore configStore; private AbstractWorkflowEngine workflowEngine; private LocalSchedulableEntityManager localSchedulableEntityManager; private LocalInstanceManager localInstanceManager; + private LocalExtensionManager localExtensionManager; public FalconUnitClient() throws FalconException { @@ -83,6 +87,7 @@ public class FalconUnitClient extends AbstractFalconClient { workflowEngine = WorkflowEngineFactory.getWorkflowEngine(); localSchedulableEntityManager = new LocalSchedulableEntityManager(); localInstanceManager = new LocalInstanceManager(); + localExtensionManager = new LocalExtensionManager(); } public ConfigurationStore getConfigStore() { @@ -167,7 +172,7 @@ public class FalconUnitClient extends AbstractFalconClient { String sortOrder, Integer offset, Integer numResults, String doAsUser, Boolean allAttempts) { if (orderBy == null) { - orderBy = DEFAULT_ORDERBY; + orderBy = DEFAULT_ORDER_BY; } if (sortOrder == null) { sortOrder = DEFAULT_SORTED_ORDER; @@ -183,7 +188,6 @@ public class FalconUnitClient extends AbstractFalconClient { } - /** * Schedules an submitted process entity immediately. * @@ -266,9 +270,50 @@ public class FalconUnitClient extends AbstractFalconClient { } @Override + public String registerExtension(String extensionName, String packagePath, String description) { + return localExtensionManager.registerExtensionMetadata(extensionName, packagePath, description); + } + + @Override + public String unregisterExtension(String extensionName) { + return localExtensionManager.unRegisterExtension(extensionName); + } + + @Override public APIResult submitExtensionJob(String extensionName, String jobName, String configPath, String doAsUser) { - //TODO Make falcon unit client changes for submitting recipe too. - throw new UnsupportedOperationException("Not yet Implemented"); + + InputStream configStream = getServletInputStream(configPath); + try { + List<Entity> entities = getEntities(extensionName, jobName, configStream); + return localExtensionManager.submitExtensionJob(extensionName, jobName, configStream, entities); + } catch (FalconException | IOException e) { + throw new FalconCLIException("Failed in submitting extension job " + jobName); + } + } + + private List<Entity> getEntities(String extensionName, String jobName, InputStream configStream) { + String packagePath = ExtensionStore.get().getMetaStore().getDetail(extensionName).getLocation(); + List<Entity> entities; + try { + entities = ExtensionHandler.loadAndPrepare(extensionName, jobName, configStream, + packagePath); + } catch (FalconException | IOException e) { + throw new FalconCLIException("Failed in generating entties" + jobName); + } + return entities; + } + + @Override + public APIResult submitAndScheduleExtensionJob(String extensionName, String jobName, String configPath, + String doAsUser) { + InputStream configStream = getServletInputStream(configPath); + try { + List<Entity> entities = getEntities(extensionName, jobName, configStream); + return localExtensionManager.submitAndSchedulableExtensionJob(extensionName, jobName, configStream, + entities); + } catch (FalconException | IOException e) { + throw new FalconCLIException("Failed in submitting extension job " + jobName); + } } @Override @@ -327,7 +372,7 @@ public class FalconUnitClient extends AbstractFalconClient { String colo, List<LifeCycle> lifeCycles, String filterBy, String orderBy, String sortOrder, String doAsUser) { if (StringUtils.isBlank(orderBy)) { - orderBy = DEFAULT_ORDERBY; + orderBy = DEFAULT_ORDER_BY; } if (StringUtils.isBlank(sortOrder)) { sortOrder = DEFAULT_SORTED_ORDER; http://git-wip-us.apache.org/repos/asf/falcon/blob/c79e5e4d/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java new file mode 100644 index 0000000..5d2710c --- /dev/null +++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.unit; + +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.resource.APIResult; +import org.apache.falcon.resource.extensions.ExtensionManager; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +/** + * A proxy implementation of the extension operations in local mode. + */ +public class LocalExtensionManager extends ExtensionManager { + public LocalExtensionManager() {} + + public APIResult submitExtensionJob(String extensionName, String jobName, InputStream config, List<Entity> entities) + throws FalconException, IOException { + submitEntities(extensionName, null, jobName, entities, config); + return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully" + jobName); + } + + public APIResult submitAndSchedulableExtensionJob(String extensionName, String jobName, InputStream config, + List<Entity> entities) throws FalconException, IOException { + submitEntities(extensionName, null, jobName, entities, config); + for (Entity entity : entities) { + scheduleInternal(entity.getEntityType().name(), entity.getName(), null, null); + } + return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully" + jobName); + } + + + public String registerExtensionMetadata(String extensionName, String packagePath , String description) { + return super.registerExtensionMetadata(extensionName, packagePath, description); + } + + public String unRegisterExtension(String extensionName) { + return super.deleteExtensionMetadata(extensionName); + } + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/c79e5e4d/unit/src/main/resources/startup.properties ---------------------------------------------------------------------- diff --git a/unit/src/main/resources/startup.properties b/unit/src/main/resources/startup.properties index 0e404cc..99c8165 100644 --- a/unit/src/main/resources/startup.properties +++ b/unit/src/main/resources/startup.properties @@ -34,9 +34,10 @@ org.apache.falcon.workflow.WorkflowJobEndNotificationService, \ org.apache.falcon.service.ProcessSubscriberService,\ org.apache.falcon.service.FalconJPAService,\ + org.apache.falcon.extensions.ExtensionService,\ org.apache.falcon.entity.store.ConfigurationStore,\ org.apache.falcon.rerun.service.RetryService,\ - org.apache.falcon.rerun.service.LateRunService,\ + org.apache.falcon.rerun.service.LateRunService ##### Falcon Configuration Store Change listeners ##### *.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ @@ -63,6 +64,7 @@ debug.libext.process.paths=${falcon.libext} *.falcon.cleanup.service.frequency=minutes(5) +*.extension.store.uri=file://${user.dir}/target/extensions ######### Properties for configuring JMS provider - activemq ######### # Default Active MQ url http://git-wip-us.apache.org/repos/asf/falcon/blob/c79e5e4d/unit/src/test/java/org/apache/falcon/ExtensionHandlerTest.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/ExtensionHandlerTest.java b/unit/src/test/java/org/apache/falcon/ExtensionHandlerTest.java new file mode 100644 index 0000000..03294bc --- /dev/null +++ b/unit/src/test/java/org/apache/falcon/ExtensionHandlerTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon; + +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.hadoop.fs.Path; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.InputStream; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; + +/** + * Test Class for validating Extension util helper methods. + */ +public class ExtensionHandlerTest { + public static final String PROCESS_XML = "/extension-example-duplicate.xml"; + public static final String JARS_DIR = "file:///" + System.getProperty("user.dir") + "/src/test/resources"; + public static final String CONFIG_PATH = "file:///" + System.getProperty("user.dir") + + "/src/test/resources/extension.properties"; + + @Test + public void testPrepareAndSetEntityTags() throws Exception { + Entity process = (Entity) EntityType.PROCESS.getUnmarshaller().unmarshal( + getClass().getResourceAsStream(PROCESS_XML)); + EntityUtil.setEntityTags(process, "testTag"); + Assert.assertTrue(EntityUtil.getTags(process).contains("testTag")); + + List<URL> urls = new ArrayList<>(); + + InputStream configStream = null; + try { + configStream = new FileInputStream(CONFIG_PATH); + } catch (FileNotFoundException e) { + //ignore + } + + urls.addAll(ExtensionHandler.getFilesInPath(new Path(JARS_DIR).toUri().toURL())); + List<Entity> entities = ExtensionHandler.prepare("extensionName", "jobName", configStream, urls); + Assert.assertEquals(entities.size(), 1); + Assert.assertEquals(entities.get(0), process); + } +} + http://git-wip-us.apache.org/repos/asf/falcon/blob/c79e5e4d/unit/src/test/java/org/apache/falcon/ExtensionUtilTest.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/ExtensionUtilTest.java b/unit/src/test/java/org/apache/falcon/ExtensionUtilTest.java deleted file mode 100644 index 7e931d7..0000000 --- a/unit/src/test/java/org/apache/falcon/ExtensionUtilTest.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon; - -import org.apache.falcon.entity.EntityUtil; -import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.hadoop.fs.Path; -import org.testng.Assert; -import org.testng.annotations.Test; - -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.InputStream; -import java.net.URL; -import java.util.ArrayList; -import java.util.List; - -/** - * Test Class for validating Extension util helper methods. - */ -public class ExtensionUtilTest { - public static final String PROCESS_XML = "/process.xml"; - public static final String JARS_DIR = "file:///" + System.getProperty("user.dir") + "/src/test/resources"; - public static final String CONFIG_PATH = "file:///" + System.getProperty("user.dir") - + "/src/test/resources/extension.properties"; - - @Test - public void testPrepareAndSetEntityTags() throws Exception { - Entity process = (Entity) EntityType.PROCESS.getUnmarshaller().unmarshal( - getClass().getResourceAsStream(PROCESS_XML)); - EntityUtil.setEntityTags(process, "testTag"); - Assert.assertTrue(EntityUtil.getTags(process).contains("testTag")); - - List<URL> urls = new ArrayList<>(); - - InputStream configStream = null; - try { - configStream = new FileInputStream(CONFIG_PATH); - } catch (FileNotFoundException e) { - //ignore - } - - urls.addAll(ExtensionHandler.getFilesInPath(new Path(JARS_DIR).toUri().toURL())); - List<Entity> entities = ExtensionHandler.prepare("extensionName", "jobName", configStream, urls); - Assert.assertEquals(entities.size(), 1); - Assert.assertEquals(entities.get(0), process); - } -} - http://git-wip-us.apache.org/repos/asf/falcon/blob/c79e5e4d/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java index bfc8b08..4ed7161 100644 --- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java +++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java @@ -218,6 +218,25 @@ public class FalconUnitTestBase { return props; } + public String registerExtension(String extensionName, String packagePath, String description) + throws IOException, FalconException { + + return falconUnitClient.registerExtension(extensionName, packagePath, description); + } + + public String unregisterExtension(String extensionName) { + return falconUnitClient.unregisterExtension(extensionName); + } + + public APIResult submitExtensionJob(String extensionName, String jobName, String configPath, String doAsUser) { + return falconUnitClient.submitExtensionJob(extensionName, jobName, configPath, doAsUser); + } + + public APIResult submitAndScheduleExtensionJob(String extensionName, String jobName, String configPath, + String doAsUser) { + return falconUnitClient.submitAndScheduleExtensionJob(extensionName, jobName, configPath, doAsUser); + } + public static String overlayParametersOverTemplate(String template, Map<String, String> overlay) throws IOException { File tmpFile = getTempFile(); http://git-wip-us.apache.org/repos/asf/falcon/blob/c79e5e4d/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java index 0bc7755..3555b22 100644 --- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java +++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java @@ -29,11 +29,14 @@ import org.apache.falcon.resource.FeedInstanceResult; import org.apache.falcon.resource.InstanceDependencyResult; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.resource.InstancesSummaryResult; +import org.apache.falcon.service.FalconJPAService; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.testng.Assert; import org.testng.annotations.Test; +import javax.persistence.EntityManager; +import javax.persistence.Query; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; @@ -61,6 +64,8 @@ public class TestFalconUnit extends FalconUnitTestBase { private static final String END_TIME = "2013-11-18T00:07Z"; private static final String WORKFLOW = "workflow.xml"; private static final String SLEEP_WORKFLOW = "sleepWorkflow.xml"; + private static final String EXTENSION_PATH = "/projects/falcon/extension/testExtension"; + public static final String JARS_DIR = "file:///" + System.getProperty("user.dir") + "/src/test/resources"; @Test public void testProcessInstanceExecution() throws Exception { @@ -380,4 +385,92 @@ public class TestFalconUnit extends FalconUnitTestBase { Assert.assertNotNull(summaryResult.getEntitySummaries()); Assert.assertEquals(summaryResult.getEntitySummaries().length, 1); } + + private void clearDB() { + EntityManager em = FalconJPAService.get().getEntityManager(); + em.getTransaction().begin(); + try { + Query query = em.createNativeQuery("delete from EXTENSIONS"); + query.executeUpdate(); + query = em.createNativeQuery("delete from EXTENSION_JOBS"); + query.executeUpdate(); + } finally { + em.getTransaction().commit(); + em.close(); + } + } + + @Test + public void testRegisterAndUnregisterExtension() throws Exception { + clearDB(); + submitCluster(); + createExtensionPackage(); + + String result = registerExtension("testExtension", new Path(EXTENSION_PATH).toString(), "testExtension"); + Assert.assertEquals(result, "Extension :testExtension registered successfully."); + + result = unregisterExtension("testExtension"); + Assert.assertEquals(result, "Deleted extension:testExtension"); + } + + @Test + public void testSubmitAndScheduleExtensionJob() throws Exception { + clearDB(); + submitCluster(); + createExtensionPackage(); + String packageBuildLib = new Path(EXTENSION_PATH, "libs/build/").toString(); + String result = registerExtension("testExtension", EXTENSION_PATH, "testExtension"); + Assert.assertEquals(result, "Extension :testExtension registered successfully."); + + createDir(PROCESS_APP_PATH); + copyExtensionJar(packageBuildLib); + APIResult apiResult = submitAndScheduleExtensionJob("testExtension", "testJob", null, null); + assertStatus(apiResult); + + apiResult = getClient().getStatus(EntityType.PROCESS, "sample", CLUSTER_NAME, null, false); + assertStatus(apiResult); + Assert.assertEquals(apiResult.getMessage(), "RUNNING"); + } + + + void copyExtensionJar(String destDirPath) throws IOException { + File dir = new File(new Path(JARS_DIR).toUri().toURL().getPath()); + for (File file : dir.listFiles()) { + if (file.toString().endsWith(".jar")) { + fs.copyFromLocalFile(new Path(file.getPath()), + new Path(destDirPath, file.getName())); + } + } + // empty jar file , hence deleting it and copying the actual build package jar. + fs.delete(new Path(EXTENSION_PATH, "libs/build/test.jar"), true); + } + + private void createExtensionPackage() throws IOException{ + Path basePath = new Path(EXTENSION_PATH); + if (fs.exists(basePath)){ + fs.delete(basePath, true); + } + Path buildLibs = new Path(EXTENSION_PATH, "libs/build"); + fs.mkdirs(buildLibs); + fs.create(new Path(buildLibs, "test.jar")); + fs.close(); + fs.mkdirs(new Path(EXTENSION_PATH, "libs/runtime")); + + Path readMePath = new Path(EXTENSION_PATH, "README"); + if (fs.exists(readMePath)) { + fs.delete(readMePath, true); + } + fs.create(readMePath); + fs.close(); + Path metaPath = new Path(EXTENSION_PATH , "META"); + if (fs.exists(metaPath)){ + fs.delete(metaPath, true); + } + fs.mkdirs(metaPath); + fs.create(new Path(metaPath, "config")); + fs.close(); + + fs.mkdirs(new Path(EXTENSION_PATH, "resources/build")); + fs.mkdirs(new Path(EXTENSION_PATH, "resources/runtime")); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/c79e5e4d/unit/src/test/resources/extension-example-duplicate.xml ---------------------------------------------------------------------- diff --git a/unit/src/test/resources/extension-example-duplicate.xml b/unit/src/test/resources/extension-example-duplicate.xml new file mode 100644 index 0000000..4a2a982 --- /dev/null +++ b/unit/src/test/resources/extension-example-duplicate.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<process name="sample" version="0" xmlns="uri:falcon:process:0.1"> + <tags>[email protected],[email protected],_department_type=forecasting</tags> + <pipelines>testPipeline</pipelines> + <clusters> + <cluster name="local"> + <validity start="2016-12-02T00:00Z" end="2091-12-30T00:00Z"/> + </cluster> + </clusters> + <parallel>1</parallel> + <order>LIFO</order> + <frequency>hours(1)</frequency> + <sla shouldStartIn="hours(2)" shouldEndIn="hours(4)"/> + <!-- how --> + <properties> + <property name="name1" value="value1"/> + <property name="name2" value="value2"/> + </properties> + + <workflow engine="oozie" path="/app/oozie-mr"/> + + <retry policy="periodic" delay="minutes(10)" attempts="3"/> + + <notification type="email" to="falcon@localhost"/> +</process> http://git-wip-us.apache.org/repos/asf/falcon/blob/c79e5e4d/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java b/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java index 8c4047f..9585360 100644 --- a/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java +++ b/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java @@ -47,7 +47,7 @@ import java.util.List; import java.util.Map; /** - * Integration tests for Processing Engines, Pig & Hive with both FS and table storage. + * Integration tests for Processing Engines, Pig & Hive with both fs and table storage. * * This test is disabled as it heavily depends on oozie sharelibs for * pig and hcatalog being made available on HDFS. captured in FALCON-139.
