Repository: falcon Updated Branches: refs/heads/master 1b7708fa1 -> 3f5087997
FALCON-2185 Falcon Client changes for Falcon user extensions. Author: sandeep <[email protected]> Reviewers: @pallavi-rao Closes #306 from sandeepSamudrala/FALCON-2185 and squashes the following commits: 466705f [sandeep] FALCON-2185 Incorporated review comments.Made stage entities private method 2a3e61d [sandeep] FALCON-2185 Incorporated more review comments e3516c2 [sandeep] FALCON-2185 Incorporated review comments cfe6c57 [sandeep] FALCON-2185 Moved UTs to falcon unit and example to extensions ebac5bb [sandeep] FALCON-2185 Falcon Client changes for Falcon user extensions d680244 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2185 8b2e0d9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2185 2fd05bb [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2185 fc7e9a1 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2185 8aacd75 [sandeep] FALCON-2183 Incorporated review comments f3d7268 [sandeep] FALCON-2183 Incorporated review comments 11e7b3f [sandeep] FALCON-2183 Extension Builder changes to support new user extensions 250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon 1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon c065566 [sandeep] reverting last line changes made 1a4dcd2 [sandeep] rebased and resolved the conflicts from master 271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay. a94d4fe [sandeep] rebasing from master 9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/3f508799 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/3f508799 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/3f508799 Branch: refs/heads/master Commit: 3f50879971453f4ced61fa24c4f4a425cbf2631e Parents: 1b7708f Author: sandeep <[email protected]> Authored: Fri Nov 25 15:54:56 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Fri Nov 25 15:54:56 2016 +0530 ---------------------------------------------------------------------- .../apache/falcon/cli/FalconExtensionCLI.java | 32 ++-- client/pom.xml | 4 + .../org/apache/falcon/ExtensionClassLoader.java | 61 ++++++ .../org/apache/falcon/ExtensionHandler.java | 189 +++++++++++++++++++ .../falcon/client/AbstractFalconClient.java | 12 ++ .../org/apache/falcon/client/FalconClient.java | 50 ++++- .../falcon/client/FalconExtensionConstants.java | 34 ++++ .../org/apache/falcon/entity/EntityUtil.java | 46 +++++ .../site/twiki/falconcli/ExtensionSubmit.twiki | 2 +- .../falconcli/ExtensionSubmitAndSchedule.twiki | 2 +- .../org/apache/falcon/ExtensionExample.java | 66 +++++++ ...rg.apache.falcon.extensions.ExtensionBuilder | 18 ++ extensions/src/test/resources/process.xml | 59 ++++++ .../resource/extensions/ExtensionManager.java | 40 +--- unit/pom.xml | 23 +++ .../apache/falcon/unit/FalconUnitClient.java | 6 + .../apache/falcon/ExtensionClassLoaderTest.java | 55 ++++++ .../org/apache/falcon/ExtensionUtilTest.java | 66 +++++++ unit/src/test/resources/extension.properties | 23 +++ 19 files changed, 728 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java ---------------------------------------------------------------------- diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java index 83b550f..15b1b32 100644 --- a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java +++ b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java @@ -54,7 +54,7 @@ public class FalconExtensionCLI { public static final String REGISTER_OPT = "register"; // Input parameters - public static final String ENTENSION_NAME_OPT = "extensionName"; + public static final String EXTENSION_NAME_OPT = "extensionName"; public static final String JOB_NAME_OPT = "jobName"; public static final String DESCRIPTION = "description"; public static final String PATH = "path"; @@ -69,7 +69,7 @@ public class FalconExtensionCLI { } String result; - String extensionName = commandLine.getOptionValue(ENTENSION_NAME_OPT); + String extensionName = commandLine.getOptionValue(EXTENSION_NAME_OPT); String jobName = commandLine.getOptionValue(JOB_NAME_OPT); String filePath = commandLine.getOptionValue(FalconCLIConstants.FILE_PATH_OPT); String doAsUser = commandLine.getOptionValue(FalconCLIConstants.DO_AS_OPT); @@ -80,40 +80,36 @@ public class FalconExtensionCLI { result = client.enumerateExtensions(); result = prettyPrintJson(result); } else if (optionsList.contains(DEFINITION_OPT)) { - validateRequiredParameter(extensionName, ENTENSION_NAME_OPT); + validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); result = client.getExtensionDefinition(extensionName); result = prettyPrintJson(result); } else if (optionsList.contains(DESCRIBE_OPT)) { - validateRequiredParameter(extensionName, ENTENSION_NAME_OPT); + validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); result = client.getExtensionDescription(extensionName); } else if (optionsList.contains(UNREGISTER_OPT)) { - validateRequiredParameter(extensionName, ENTENSION_NAME_OPT); + validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); result = client.unregisterExtension(extensionName); }else if (optionsList.contains(DETAIL_OPT)) { - validateRequiredParameter(extensionName, ENTENSION_NAME_OPT); + validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); result = client.getExtensionDetail(extensionName); } else if (optionsList.contains(FalconCLIConstants.SUBMIT_OPT)) { - validateRequiredParameter(extensionName, ENTENSION_NAME_OPT); + validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT); - result = client.submitExtensionJob(extensionName, filePath, doAsUser).getMessage(); + result = client.submitExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage(); } else if (optionsList.contains(REGISTER_OPT)) { - validateRequiredParameter(extensionName, ENTENSION_NAME_OPT); + validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); validateRequiredParameter(path, PATH); result = client.registerExtension(extensionName, path, description); - }else if (optionsList.contains(FalconCLIConstants.SUBMIT_OPT)) { - validateRequiredParameter(extensionName, ENTENSION_NAME_OPT); - validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT); - result = client.submitExtensionJob(extensionName, filePath, doAsUser).getMessage(); } else if (optionsList.contains(FalconCLIConstants.SUBMIT_AND_SCHEDULE_OPT)) { - validateRequiredParameter(extensionName, ENTENSION_NAME_OPT); + validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT); result = client.submitAndScheduleExtensionJob(extensionName, filePath, doAsUser).getMessage(); } else if (optionsList.contains(FalconCLIConstants.UPDATE_OPT)) { - validateRequiredParameter(extensionName, ENTENSION_NAME_OPT); + validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT); result = client.updateExtensionJob(extensionName, filePath, doAsUser).getMessage(); } else if (optionsList.contains(FalconCLIConstants.VALIDATE_OPT)) { - validateRequiredParameter(extensionName, ENTENSION_NAME_OPT); + validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT); result = client.validateExtensionJob(extensionName, filePath, doAsUser).getMessage(); } else if (optionsList.contains(FalconCLIConstants.SCHEDULE_OPT)) { @@ -129,7 +125,7 @@ public class FalconExtensionCLI { validateRequiredParameter(jobName, JOB_NAME_OPT); result = client.deleteExtensionJob(jobName, doAsUser).getMessage(); } else if (optionsList.contains(FalconCLIConstants.LIST_OPT)) { - validateRequiredParameter(extensionName, ENTENSION_NAME_OPT); + validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); ExtensionJobList jobs = client.listExtensionJob(extensionName, doAsUser, commandLine.getOptionValue(FalconCLIConstants.SORT_ORDER_OPT), commandLine.getOptionValue(FalconCLIConstants.OFFSET_OPT), @@ -203,7 +199,7 @@ public class FalconExtensionCLI { Option doAs = new Option(FalconCLIConstants.DO_AS_OPT, true, "doAs user"); Option debug = new Option(FalconCLIConstants.DEBUG_OPTION, false, "Use debug mode to see debugging statements on stdout"); - Option extensionName = new Option(ENTENSION_NAME_OPT, true, "Extension name"); + Option extensionName = new Option(EXTENSION_NAME_OPT, true, "Extension name"); Option jobName = new Option(JOB_NAME_OPT, true, "Extension job name"); Option instanceStatus = new Option(FalconCLIConstants.INSTANCE_STATUS_OPT, true, "Instance status"); Option sortOrder = new Option(FalconCLIConstants.SORT_ORDER_OPT, true, "asc or desc order for results"); http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/client/pom.xml ---------------------------------------------------------------------- diff --git a/client/pom.xml b/client/pom.xml index 9daa998..b8647f9 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -112,6 +112,10 @@ <artifactId>hive-webhcat-java-client</artifactId> <version>${hive.version}</version> </dependency> + <dependency> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-extensions</artifactId> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/client/src/main/java/org/apache/falcon/ExtensionClassLoader.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/ExtensionClassLoader.java b/client/src/main/java/org/apache/falcon/ExtensionClassLoader.java new file mode 100644 index 0000000..b1e88c3 --- /dev/null +++ b/client/src/main/java/org/apache/falcon/ExtensionClassLoader.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Arrays; +import java.util.List; + +/** + * Helper class loader that fetches jars from local disk and loads into JVM. + */ + +public class ExtensionClassLoader extends URLClassLoader{ + + public static final Logger LOG = LoggerFactory.getLogger(ExtensionClassLoader.class); + + public ExtensionClassLoader(URL[] urls, ClassLoader parent) { + super(urls, parent); + } + + public static ClassLoader load(final List<URL> urls) throws IOException { + final ClassLoader parentClassLoader = ExtensionClassLoader.class.getClassLoader(); + ClassLoader extensionClassLoader = java.security.AccessController.doPrivileged( + new java.security.PrivilegedAction<ExtensionClassLoader>() { + @Override + public ExtensionClassLoader run() { + return new ExtensionClassLoader(urls.toArray(new URL[urls.size()]), parentClassLoader); + } + } + ); + LOG.info("Created a new ExtensionClassLoader using classpath = {}", Arrays.toString(urls.toArray())); + return extensionClassLoader; + } + + @Override + protected void addURL(URL url) { + super.addURL(url); + } + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/client/src/main/java/org/apache/falcon/ExtensionHandler.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/ExtensionHandler.java b/client/src/main/java/org/apache/falcon/ExtensionHandler.java new file mode 100644 index 0000000..80df791 --- /dev/null +++ b/client/src/main/java/org/apache/falcon/ExtensionHandler.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon; + +import org.apache.commons.codec.CharEncoding; +import org.apache.falcon.client.FalconCLIException; +import org.apache.falcon.client.FalconExtensionConstants; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.extensions.ExtensionBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.List; +import java.util.ServiceLoader; + +/** + * Handler class that is responsible for preparing Extension entities. + */ +public final class ExtensionHandler { + + public static final Logger LOG = LoggerFactory.getLogger(ExtensionHandler.class); + private static final String UTF_8 = CharEncoding.UTF_8; + private static final String TMP_BASE_DIR = String.format("file://%s", System.getProperty("java.io.tmpdir")); + + public List<Entity> getEntities(ClassLoader extensionClassloader, String extensionName, String jobName, + InputStream configStream) throws IOException, FalconException { + Thread.currentThread().setContextClassLoader(extensionClassloader); + + ServiceLoader<ExtensionBuilder> extensionBuilders = ServiceLoader.load(ExtensionBuilder.class); + + List<Class<? extends ExtensionBuilder>> result = new ArrayList<>(); + + for (ExtensionBuilder extensionBuilder : extensionBuilders) { + result.add(extensionBuilder.getClass()); + } + + if (result.isEmpty()) { + throw new FalconException("Extension Implementation not found in the package of : " + extensionName); + } else if (result.size() > 1) { + throw new FalconException("Found more than one extension Implementation in the package of : " + + extensionName); + } + + ExtensionBuilder extensionBuilder = null; + try { + Class<ExtensionBuilder> clazz = (Class<ExtensionBuilder>) extensionClassloader + .loadClass(result.get(0).getCanonicalName()); + extensionBuilder = clazz.newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { + throw new FalconCLIException("Failed to instantiate extension implementation " + extensionName, e); + } + + extensionBuilder.validateExtensionConfig(extensionName, configStream); + List<Entity> entities = extensionBuilder.getEntities(jobName, configStream); + + return entities; + } + + public static List<Entity> loadAndPrepare(String extensionName, String jobName, InputStream configStream, + String extensionBuildLocation) throws IOException, FalconException { + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(conf); + String stagePath = createStagePath(extensionName, jobName); + List<URL> urls = ExtensionHandler.copyExtensionPackage(extensionBuildLocation, fs, stagePath); + + List<Entity> entities = prepare(extensionName, jobName, configStream, urls); + ExtensionHandler.stageEntities(entities, stagePath); + return entities; + } + + public static List<Entity> prepare(String extensionName, String jobName, InputStream configStream, List<URL> urls) + throws IOException, FalconException { + ClassLoader extensionClassLoader = ExtensionClassLoader.load(urls); + ExtensionHandler extensionHandler = new ExtensionHandler(); + + return extensionHandler.getEntities(extensionClassLoader, extensionName, jobName, configStream); + } + + // This method is only for debugging, the staged entities can be found in /tmp path. + private static void stageEntities(List<Entity> entities, String stagePath) { + File entityFile; + EntityType type; + for (Entity entity : entities) { + type = entity.getEntityType(); + OutputStream out; + try { + entityFile = new File(stagePath + File.separator + entity.getEntityType().toString() + "_" + + URLEncoder.encode(entity.getName(), UTF_8)); + if (!entityFile.createNewFile()) { + LOG.debug("Not able to stage the entities in the tmp path"); + return; + } + out = new FileOutputStream(entityFile); + type.getMarshaller().marshal(entity, out); + LOG.debug("Staged configuration {}/{}", type, entity.getName()); + out.close(); + } catch (Exception e) { + LOG.error("Unable to serialize the entity object {}/{}", type, entity.getName(), e); + } + } + } + + private static String createStagePath(String extensionName, String jobName) { + String stagePath = TMP_BASE_DIR + File.separator + extensionName + File.separator + jobName + + File.separator + System.currentTimeMillis()/1000; + File tmpPath = new File(stagePath); + if (tmpPath.mkdir()) { + throw new FalconCLIException("Failed to create stage directory" + tmpPath.toString()); + } + return stagePath; + } + + public static List<URL> copyExtensionPackage(String extensionBuildUrl, FileSystem fs, String stagePath) + throws IOException { + + Path libsPath = new Path(extensionBuildUrl, FalconExtensionConstants.LIBS); + Path buildLibsPath = new Path(libsPath, FalconExtensionConstants.BUILD); + Path localStagePath = new Path(stagePath); + Path localBuildLibsPath = new Path(localStagePath, FalconExtensionConstants.LIBS); + LOG.info("Copying build time libs from {} to {}", buildLibsPath, localBuildLibsPath); + fs.copyToLocalFile(buildLibsPath, localBuildLibsPath); + + Path resourcesPath = new Path(extensionBuildUrl, FalconExtensionConstants.RESOURCES); + Path buildResourcesPath = new Path(resourcesPath, FalconExtensionConstants.BUILD); + Path localBuildResourcesPath = new Path(localStagePath, FalconExtensionConstants.RESOURCES); + LOG.info("Copying build time resources from {} to {}", buildLibsPath, localBuildResourcesPath); + fs.copyToLocalFile(buildResourcesPath, localBuildResourcesPath); + + List<URL> urls = new ArrayList<>(); + urls.addAll(getFilesInPath(localBuildLibsPath.toUri().toURL())); + urls.add(localBuildResourcesPath.toUri().toURL()); + return urls; + } + + public static List<URL> getFilesInPath(URL fileURL) throws MalformedURLException { + List<URL> urls = new ArrayList<>(); + + File file = new File(fileURL.getPath()); + if (file.isDirectory()) { + File[] files = file.listFiles(); + + if (files != null) { + for (File innerFile : files) { + if (innerFile.isFile()) { + urls.add(innerFile.toURI().toURL()); + } else { + urls.addAll(getFilesInPath(file.toURI().toURL())); + } + } + } + + if (!fileURL.toString().endsWith("/")) { + fileURL = new URL(fileURL.toString() + "/"); + } + } + + urls.add(fileURL); + return urls; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java index 5d6eff5..01dd6c6 100644 --- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java @@ -179,6 +179,18 @@ public abstract class AbstractFalconClient { String properties); /** + * Prepare set of entities the extension has implemented and stage them to a local directory and submit them too. + * @param extensionName extension which is available in the store. + * @param jobName name to be used in all the extension entities' tagging that are built as part of + * loadAndPrepare. + * @param configPath path to extension parameters. + * @return + * @throws FalconCLIException + */ + public abstract APIResult submitExtensionJob(String extensionName, String jobName, String configPath, + String doAsUser); + + /** * * Get list of the entities. * We have two filtering parameters for entity tags: "tags" and "tagkeys". http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index dabed3f..4d4517c 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -26,10 +26,14 @@ import com.sun.jersey.client.urlconnection.HTTPSProperties; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.net.util.TrustManagerUtils; +import org.apache.falcon.FalconException; import org.apache.falcon.LifeCycle; +import org.apache.falcon.ExtensionHandler; +import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.v0.DateValidator; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.extensions.ExtensionType; import org.apache.falcon.metadata.RelationshipType; import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.EntityList; @@ -47,6 +51,8 @@ import org.apache.falcon.resource.TriageResult; import org.apache.hadoop.security.authentication.client.AuthenticatedURL; import org.apache.hadoop.security.authentication.client.KerberosAuthenticator; import org.apache.hadoop.security.authentication.client.PseudoAuthenticator; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.HttpsURLConnection; @@ -118,6 +124,7 @@ public class FalconClient extends AbstractFalconClient { public static final String DO_AS_OPT = "doAs"; + public static final String ENTITIES_OPT = "entities"; /** * Name of the HTTP cookie used for the authentication token between the client and the server. */ @@ -134,6 +141,7 @@ public class FalconClient extends AbstractFalconClient { return true; } }; + private static final String TAG_SEPARATOR = ","; private final WebResource service; private final AuthenticatedURL.Token authenticationToken; @@ -1059,12 +1067,48 @@ public class FalconClient extends AbstractFalconClient { return getResponse(String.class, clientResponse); } - public APIResult submitExtensionJob(final String extensionName, final String filePath, final String doAsUser) { - InputStream entityStream = getServletInputStream(filePath); + @Override + public APIResult submitExtensionJob(final String extensionName, final String jobName, final String configPath, + final String doAsUser) { ClientResponse clientResponse = new ResourceBuilder() + .path(ExtensionOperations.DETAIL.path) + .call(ExtensionOperations.DETAIL); + JSONObject responseJson = clientResponse.getEntity(JSONObject.class); + ExtensionType extensionType; + String extensionBuildLocation; + try { + JSONObject extensionDetailsJson = new JSONObject(responseJson.get("detail").toString()); + extensionType = ExtensionType.valueOf(extensionDetailsJson.get("type").toString().toUpperCase()); + extensionBuildLocation = extensionDetailsJson.get("location").toString(); + } catch (JSONException e) { + OUT.get().print("Error. " + extensionName + " not found "); + return null; + } + InputStream configStream = getServletInputStream(configPath); + + List<Entity> entities; + if (extensionType.equals(ExtensionType.CUSTOM)) { + try { + entities = ExtensionHandler.loadAndPrepare(extensionName, jobName, configStream, extensionBuildLocation); + } catch (Exception e) { + OUT.get().println("Error in building the extension"); + return null; + } + if (entities == null || entities.isEmpty()) { + OUT.get().println("No entities got built"); + return null; + } + try { + EntityUtil.applyTags(extensionName, jobName, entities); + } catch (FalconException e) { + OUT.get().println("Error in applying tags to generated entities"); + } + } + + clientResponse = new ResourceBuilder() .path(ExtensionOperations.SUBMIT.path, extensionName) .addQueryParam(DO_AS_OPT, doAsUser) - .call(ExtensionOperations.SUBMIT, entityStream); + .call(ExtensionOperations.SUBMIT, configStream); return getResponse(APIResult.class, clientResponse); } http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/client/src/main/java/org/apache/falcon/client/FalconExtensionConstants.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconExtensionConstants.java b/client/src/main/java/org/apache/falcon/client/FalconExtensionConstants.java new file mode 100644 index 0000000..086a8bb --- /dev/null +++ b/client/src/main/java/org/apache/falcon/client/FalconExtensionConstants.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.client; + +/** + * Falcon Extensions Constants. + */ +public final class FalconExtensionConstants { + private FalconExtensionConstants() { + + } + + public static final String SERVICES = "SERVICES"; + public static final String META_INF = "META-INF"; + public static final String LIBS = "libs"; + public static final String RESOURCES = "resources"; + public static final String BUILD = "build"; +} http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/common/src/main/java/org/apache/falcon/entity/EntityUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java index f3d5d28..183661b 100644 --- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java +++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java @@ -93,8 +93,12 @@ public final class EntityUtil { public static final String SUCCEEDED_FILE_NAME = "_SUCCESS"; public static final String WF_LIB_SEPARATOR = ","; + public static final String TAG_SEPARATOR = ","; private static final String STAGING_DIR_NAME_SEPARATOR = "_"; + public static final String TAG_PREFIX_EXTENSION_NAME = "_falcon_extension_name="; + public static final String TAG_PREFIX_EXTENSION_JOB = "_falcon_extension_job="; + public static final ThreadLocal<SimpleDateFormat> PATH_FORMAT = new ThreadLocal<SimpleDateFormat>() { @Override protected SimpleDateFormat initialValue() { @@ -1194,6 +1198,48 @@ public final class EntityUtil { } } + /** + * Set the tags to a given entity. + * @param entity + * @param tags + */ + public static void setEntityTags(Entity entity, String tags) { + switch (entity.getEntityType()) { + case PROCESS: + ((Process) entity).setTags(tags); + break; + case FEED: + ((Feed) entity).setTags(tags); + break; + case CLUSTER: + ((Cluster) entity).setTags(tags); + break; + default: + throw new IllegalArgumentException("Unhandled entity type " + entity.getEntityType()); + } + } + + public static void applyTags(String extensionName, String jobName, List<Entity> entities) throws FalconException { + for (Entity entity : entities) { + String tags = entity.getTags(); + if (StringUtils.isNotEmpty(tags)) { + if (tags.contains(TAG_PREFIX_EXTENSION_NAME)) { + throw new FalconException("Generated extension entity " + entity.getName() + + " should not contain tag prefix " + TAG_PREFIX_EXTENSION_NAME); + } + if (tags.contains(TAG_PREFIX_EXTENSION_JOB)) { + throw new FalconException("Generated extension entity " + entity.getName() + + " should not contain tag prefix " + TAG_PREFIX_EXTENSION_JOB); + } + setEntityTags(entity, tags + TAG_SEPARATOR + TAG_PREFIX_EXTENSION_NAME + extensionName + TAG_SEPARATOR + + TAG_PREFIX_EXTENSION_JOB + jobName); + } else { + setEntityTags(entity, TAG_PREFIX_EXTENSION_NAME + extensionName + TAG_SEPARATOR + + TAG_PREFIX_EXTENSION_JOB + jobName); + } + } + } + /** * @param properties - String of format key1:value1, key2:value2 http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/docs/src/site/twiki/falconcli/ExtensionSubmit.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/falconcli/ExtensionSubmit.twiki b/docs/src/site/twiki/falconcli/ExtensionSubmit.twiki index 40a7b44..2cda478 100644 --- a/docs/src/site/twiki/falconcli/ExtensionSubmit.twiki +++ b/docs/src/site/twiki/falconcli/ExtensionSubmit.twiki @@ -5,7 +5,7 @@ Submit an extension job. Usage: -$FALCON_HOME/bin/falcon extension -submit -extensionName <<extension-name>> -file <<path-to-file>> +$FALCON_HOME/bin/falcon extension -submit -extensionName <<extension-name>> -jobName <<job-name>> -file <<path-to-file>> Optional Args : -doAs <<user-name>> http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/docs/src/site/twiki/falconcli/ExtensionSubmitAndSchedule.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/falconcli/ExtensionSubmitAndSchedule.twiki b/docs/src/site/twiki/falconcli/ExtensionSubmitAndSchedule.twiki index ea1c0e2..4dbd76d 100644 --- a/docs/src/site/twiki/falconcli/ExtensionSubmitAndSchedule.twiki +++ b/docs/src/site/twiki/falconcli/ExtensionSubmitAndSchedule.twiki @@ -5,7 +5,7 @@ Submit and schedule an extension job. Usage: -$FALCON_HOME/bin/falcon extension -submitAndSchedule -extensionName <<extension-name>> -file <<path-to-file>> +$FALCON_HOME/bin/falcon extension -submitAndSchedule -extensionName <<extension-name>> -jobName <<job-name>> -file <<path-to-file>> Optional Args : -doAs <<user-name>> http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/extensions/src/test/java/org/apache/falcon/ExtensionExample.java ---------------------------------------------------------------------- diff --git a/extensions/src/test/java/org/apache/falcon/ExtensionExample.java b/extensions/src/test/java/org/apache/falcon/ExtensionExample.java new file mode 100644 index 0000000..f527f2e --- /dev/null +++ b/extensions/src/test/java/org/apache/falcon/ExtensionExample.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon; + +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.feed.Schema; +import org.apache.falcon.extensions.ExtensionBuilder; + +import javax.xml.bind.JAXBException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +/** + * Extension Example for testing extension loading and preparing entities. + */ +public class ExtensionExample implements ExtensionBuilder{ + + public static final String PROCESS_XML = "/process.xml"; + + @Override + public List<Entity> getEntities(String extensionName, InputStream extensionConfigStream) throws FalconException { + Entity process; + try { + process = (Entity) EntityType.PROCESS.getUnmarshaller().unmarshal( + getClass().getResourceAsStream(PROCESS_XML)); + } catch (JAXBException e) { + throw new FalconException("Failed in unmarshalling the entity"); + } + List<Entity> entities = new ArrayList<>(); + entities.add(process); + return entities; + } + + @Override + public void validateExtensionConfig(String extensionName, InputStream extensionConfigStream) + throws FalconException { + + } + + @Override + public List<Pair<String, Schema>> getOutputSchemas(String extensionName) throws FalconException { + return null; + } + + public String toString(String testString) { + return testString; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/extensions/src/test/resources/META-INF/services/org.apache.falcon.extensions.ExtensionBuilder ---------------------------------------------------------------------- diff --git a/extensions/src/test/resources/META-INF/services/org.apache.falcon.extensions.ExtensionBuilder b/extensions/src/test/resources/META-INF/services/org.apache.falcon.extensions.ExtensionBuilder new file mode 100644 index 0000000..a8d3cf8 --- /dev/null +++ b/extensions/src/test/resources/META-INF/services/org.apache.falcon.extensions.ExtensionBuilder @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + org.apache.falcon.ExtensionExample \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/extensions/src/test/resources/process.xml ---------------------------------------------------------------------- diff --git a/extensions/src/test/resources/process.xml b/extensions/src/test/resources/process.xml new file mode 100644 index 0000000..48e3a16 --- /dev/null +++ b/extensions/src/test/resources/process.xml @@ -0,0 +1,59 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<process name="sample" version="0" xmlns="uri:falcon:process:0.1"> + <tags>[email protected],[email protected],_department_type=forecasting</tags> + <pipelines>testPipeline</pipelines> + <clusters> + <cluster name="testCluster"> + <validity start="2011-11-02T00:00Z" end="2091-12-30T00:00Z"/> + </cluster> + </clusters> + <parallel>1</parallel> + <order>LIFO</order> + <frequency>hours(1)</frequency> + <sla shouldStartIn="hours(2)" shouldEndIn="hours(4)"/> + + <!-- what --> + <inputs> + <input name="impression" feed="impressionFeed" start="today(0,0)" end="today(2,0)" partition="*/US"/> + <input name="clicks" feed="clicksFeed" start="yesterday(0,0)" end="yesterday(20,0)"/> + </inputs> + + <outputs> + <output name="impOutput" feed="imp-click-join1" instance="today(0,0)"/> + <output name="clicksOutput" feed="imp-click-join2" instance="today(0,0)"/> + </outputs> + + <!-- how --> + <properties> + <property name="name1" value="value1"/> + <property name="name2" value="value2"/> + </properties> + + <workflow engine="oozie" path="/falcon/test/workflow"/> + + <retry policy="periodic" delay="minutes(10)" attempts="3"/> + + <late-process policy="exp-backoff" delay="hours(1)"> + <late-input input="impression" workflow-path="himpression/late/workflow"/> + <late-input input="clicks" workflow-path="hdfs://clicks/late/workflow"/> + </late-process> + + <notification type="email" to="falcon@localhost"/> +</process> http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java index 266e631..6f2974d 100644 --- a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java @@ -21,12 +21,10 @@ package org.apache.falcon.resource.extensions; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.FalconWebException; +import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.parser.ValidationException; import org.apache.falcon.entity.store.StoreAccessException; import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.entity.v0.cluster.Cluster; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.extensions.Extension; import org.apache.falcon.extensions.ExtensionProperties; import org.apache.falcon.extensions.ExtensionService; @@ -517,44 +515,12 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { List<Entity> entities = extension.getEntities(extensionName, request.getInputStream()); // add tags on extension name and job - for (Entity entity : entities) { - String tags = entity.getTags(); - if (StringUtils.isNotEmpty(tags)) { - if (tags.contains(TAG_PREFIX_EXTENSION_NAME)) { - throw new FalconException("Generated extention entity " + entity.getName() - + " should not contain tag prefix " + TAG_PREFIX_EXTENSION_NAME); - } - if (tags.contains(TAG_PREFIX_EXTENSION_JOB)) { - throw new FalconException("Generated extention entity " + entity.getName() - + " should not contain tag prefix " + TAG_PREFIX_EXTENSION_JOB); - } - setEntityTags(entity, tags + TAG_SEPARATOR + TAG_PREFIX_EXTENSION_NAME + extensionName + TAG_SEPARATOR - + TAG_PREFIX_EXTENSION_JOB + properties.getProperty(ExtensionProperties.JOB_NAME.getName())); - } else { - setEntityTags(entity, TAG_PREFIX_EXTENSION_NAME + extensionName + TAG_SEPARATOR - + TAG_PREFIX_EXTENSION_JOB + properties.getProperty(ExtensionProperties.JOB_NAME.getName())); - } - } + String jobName = properties.getProperty(ExtensionProperties.JOB_NAME.getName()); + EntityUtil.applyTags(extensionName, jobName, entities); return entities; } - private void setEntityTags(Entity entity, String tags) { - switch (entity.getEntityType()) { - case PROCESS: - ((Process) entity).setTags(tags); - break; - case FEED: - ((Feed) entity).setTags(tags); - break; - case CLUSTER: - ((Cluster) entity).setTags(tags); - break; - default: - LOG.error("Unknown entity type: {}", entity.getEntityType().name()); - } - } - private JSONObject buildDetailResult(final String extensionName) throws FalconException { ExtensionMetaStore metaStore = ExtensionStore.get().getMetaStore(); http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/unit/pom.xml ---------------------------------------------------------------------- diff --git a/unit/pom.xml b/unit/pom.xml index 24b39b7..6405460 100644 --- a/unit/pom.xml +++ b/unit/pom.xml @@ -132,6 +132,29 @@ </execution> </executions> </plugin> + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>test</id> + <phase>process-sources</phase> + <goals> + <goal>copy</goal> + </goals> + <configuration> + <artifactItems> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>falcon-extensions</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + </artifactItem> + </artifactItems> + <outputDirectory>${basedir}/src/test/resources</outputDirectory> + </configuration> + </execution> + </executions> + </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java index 53073f0..7248964 100644 --- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java @@ -266,6 +266,12 @@ public class FalconUnitClient extends AbstractFalconClient { } @Override + public APIResult submitExtensionJob(String extensionName, String jobName, String configPath, String doAsUser) { + //TODO Make falcon unit client changes for submitting recipe too. + throw new UnsupportedOperationException("Not yet Implemented"); + } + + @Override public EntityList getEntityList(String entityType, String fields, String nameSubsequence, String tagKeywords, String filterBy, String filterTags, String orderBy, String sortOrder, Integer offset, Integer numResults, String doAsUser) { http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/unit/src/test/java/org/apache/falcon/ExtensionClassLoaderTest.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/ExtensionClassLoaderTest.java b/unit/src/test/java/org/apache/falcon/ExtensionClassLoaderTest.java new file mode 100644 index 0000000..dbd8603 --- /dev/null +++ b/unit/src/test/java/org/apache/falcon/ExtensionClassLoaderTest.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon; + +import org.apache.hadoop.fs.Path; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.lang.reflect.Method; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; + +/** + * Test Class for validating Extension Class Loader. + */ +public class ExtensionClassLoaderTest { + public static final String JARS_DIR = "file:///" + System.getProperty("user.dir") + "/src/test/resources"; + + @Test + public void testManuallyLoadedClass() throws Exception{ + + List<URL> urls = new ArrayList<>(); + + urls.addAll(ExtensionHandler.getFilesInPath(new Path(JARS_DIR).toUri().toURL())); + + ClassLoader loader = ExtensionClassLoader.load(urls); + ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(loader); + Class<?> classManuallyLoaded = loader.loadClass("org.apache.falcon.ExtensionExample"); + + Object exampleExtension = classManuallyLoaded.newInstance(); + + Method methodToString = classManuallyLoaded.getMethod("toString", String.class); + + Thread.currentThread().setContextClassLoader(previousClassLoader); + Assert.assertEquals("test", methodToString.invoke(exampleExtension, "test")); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/unit/src/test/java/org/apache/falcon/ExtensionUtilTest.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/ExtensionUtilTest.java b/unit/src/test/java/org/apache/falcon/ExtensionUtilTest.java new file mode 100644 index 0000000..7e931d7 --- /dev/null +++ b/unit/src/test/java/org/apache/falcon/ExtensionUtilTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon; + +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.hadoop.fs.Path; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.InputStream; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; + +/** + * Test Class for validating Extension util helper methods. + */ +public class ExtensionUtilTest { + public static final String PROCESS_XML = "/process.xml"; + public static final String JARS_DIR = "file:///" + System.getProperty("user.dir") + "/src/test/resources"; + public static final String CONFIG_PATH = "file:///" + System.getProperty("user.dir") + + "/src/test/resources/extension.properties"; + + @Test + public void testPrepareAndSetEntityTags() throws Exception { + Entity process = (Entity) EntityType.PROCESS.getUnmarshaller().unmarshal( + getClass().getResourceAsStream(PROCESS_XML)); + EntityUtil.setEntityTags(process, "testTag"); + Assert.assertTrue(EntityUtil.getTags(process).contains("testTag")); + + List<URL> urls = new ArrayList<>(); + + InputStream configStream = null; + try { + configStream = new FileInputStream(CONFIG_PATH); + } catch (FileNotFoundException e) { + //ignore + } + + urls.addAll(ExtensionHandler.getFilesInPath(new Path(JARS_DIR).toUri().toURL())); + List<Entity> entities = ExtensionHandler.prepare("extensionName", "jobName", configStream, urls); + Assert.assertEquals(entities.size(), 1); + Assert.assertEquals(entities.get(0), process); + } +} + http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/unit/src/test/resources/extension.properties ---------------------------------------------------------------------- diff --git a/unit/src/test/resources/extension.properties b/unit/src/test/resources/extension.properties new file mode 100644 index 0000000..d52de1e --- /dev/null +++ b/unit/src/test/resources/extension.properties @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +#################################################### +#### This is used for falcon packaging only. #### +#################################################### + +pipelines.name=test
