FALCON-2288 overwriting jobName while job submission for trusted extensions
Author: Pracheer Agarwal <[email protected]> Author: Pracheer Agarwal <[email protected]> Author: Pracheer Agarwal <[email protected]> Author: Pracheer <[email protected]> Reviewers: @pallavi-rao,@sandeepSamudrala Closes #373 from PracheerAgarwal/FALCON-2288 and squashes the following commits: caa25d7 [Pracheer Agarwal] FALCON-2288 code restructuring d775b0d [Pracheer Agarwal] fixing checkstyle failures fcaf14a [Pracheer Agarwal] FALCON-2288, overwriting job_name for trusted extension 83d9873 [Pracheer Agarwal] FALCON-2288, overwriting job_name for trusted extension 41da933 [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon fc6ef2b [Pracheer] Merge branch 'master' of https://github.com/apache/falcon 7814fba [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon ba60452 [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon ed65aa0 [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon 9ff05df [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon 9c2f0a5 [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon 9cd8c17 [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon 778c579 [Pracheer Agarwal] Merge branch 'master' of https://github.com/PracheerAgarwal/falcon e39808d [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon a932633 [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon fda3b28 [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon a93d71a [Pracheer Agarwal] Merge branch 'master' of https://github.com/PracheerAgarwal/falcon e3728d5 [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon 066c8e2 [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon b20f044 [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon 7f572a1 [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon 46042fd [Pracheer Agarwal] Merge branch 'master' of https://github.com/PracheerAgarwal/falcon daa3ffc [Pracheer Agarwal] FALCON-2225 extension owner added for trusted extensions 622cae4 [Pracheer Agarwal] FALCON-2225 extension owner added for trusted extensions (cherry picked from commit ded34d4bdd6367e7b271fe07800d7a856f93abfc) Signed-off-by: Pallavi Rao <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/3eea94b8 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/3eea94b8 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/3eea94b8 Branch: refs/heads/master Commit: 3eea94b833187524153b294ad025aba5cb20bf55 Parents: 23229a9 Author: Pracheer Agarwal <[email protected]> Authored: Thu Mar 9 10:36:53 2017 +0530 Committer: Pallavi Rao <[email protected]> Committed: Thu Mar 9 10:37:17 2017 +0530 ---------------------------------------------------------------------- .../org/apache/falcon/extensions/Extension.java | 4 -- .../resource/proxy/ExtensionManagerProxy.java | 41 +++++++++++++------- 2 files changed, 27 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/3eea94b8/extensions/src/main/java/org/apache/falcon/extensions/Extension.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/Extension.java b/extensions/src/main/java/org/apache/falcon/extensions/Extension.java index 6b94323..3869718 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/Extension.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/Extension.java @@ -21,7 +21,6 @@ package org.apache.falcon.extensions; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.Pair; -import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.feed.Schema; import org.apache.falcon.extensions.store.ExtensionStore; @@ -110,9 +109,6 @@ public class Extension implements ExtensionBuilder { throw new FalconException("Entity created from the extension template cannot be null"); } LOG.info("Extension processing complete"); - // add tags on extension name and job - String jobName = configProperties.getProperty(ExtensionProperties.JOB_NAME.getName()); - EntityUtil.applyTags(extensionName, jobName, Collections.singletonList(entity)); return Collections.singletonList(entity); } http://git-wip-us.apache.org/repos/asf/falcon/blob/3eea94b8/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java index 033b6cc..9808892 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.Properties; import javax.servlet.ServletInputStream; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; @@ -57,6 +58,7 @@ import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.extensions.Extension; +import org.apache.falcon.extensions.ExtensionProperties; import org.apache.falcon.extensions.ExtensionService; import org.apache.falcon.extensions.ExtensionType; import org.apache.falcon.extensions.jdbc.ExtensionMetaStore; @@ -300,26 +302,37 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { List<Entity> entities; TreeMap<EntityType, List<Entity>> entityMap = new TreeMap<>(); if (ExtensionType.TRUSTED.equals(extensionType)) { - entities = extension.getEntities(extensionName, config); - List<Entity> trustedFeeds = new ArrayList<>(); - List<Entity> trustedProcesses = new ArrayList<>(); + entities = extension.getEntities(jobName, addJobNameToConf(config, jobName)); + feeds = new ArrayList<>(); + processes = new ArrayList<>(); for (Entity entity : entities) { if (EntityType.FEED.equals(entity.getEntityType())) { - trustedFeeds.add(entity); + feeds.add(entity); } else { - trustedProcesses.add(entity); + processes.add(entity); } } - entityMap.put(EntityType.PROCESS, trustedProcesses); - entityMap.put(EntityType.FEED, trustedFeeds); - return entityMap; - } else { - EntityUtil.applyTags(extensionName, jobName, processes); - EntityUtil.applyTags(extensionName, jobName, feeds); - entityMap.put(EntityType.PROCESS, processes); - entityMap.put(EntityType.FEED, feeds); - return entityMap; } + // add tags on extension name and job + EntityUtil.applyTags(extensionName, jobName, processes); + EntityUtil.applyTags(extensionName, jobName, feeds); + entityMap.put(EntityType.PROCESS, processes); + entityMap.put(EntityType.FEED, feeds); + return entityMap; + } + + private InputStream addJobNameToConf(InputStream conf, String jobName) throws FalconException{ + Properties inputProperties = new Properties(); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + try { + inputProperties.load(conf); + inputProperties.setProperty(ExtensionProperties.JOB_NAME.getName(), jobName); + inputProperties.store(output, null); + } catch (IOException e) { + LOG.error("Error in reading the config stream"); + throw new FalconException("Error while reading the config stream", e); + } + return new ByteArrayInputStream(output.toByteArray()); } private ExtensionType getExtensionType(String extensionName) {
