Repository: storm Updated Branches: refs/heads/STORM-942-cherrypick [created] 9c3dfc986
Added parseInputStream() method Included reusable parseInputStream for use by parseFile and parseResource. Also allows a path for programatically creating topologies without the need to write to / read from disk. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/11823a72 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/11823a72 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/11823a72 Branch: refs/heads/STORM-942-cherrypick Commit: 11823a722749ba77c4b207ad5d369df69fd2551b Parents: e215767 Author: Brendan W. Lyon <bl...@datainterfuse.com> Authored: Thu Jul 16 13:48:00 2015 -0400 Committer: Jungtaek Lim <kabh...@gmail.com> Committed: Fri Jul 17 06:57:28 2015 +0900 ---------------------------------------------------------------------- .../apache/storm/flux/parser/FluxParser.java | 57 +++++++++++--------- 1 file changed, 32 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/11823a72/external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java ---------------------------------------------------------------------- diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java index 72f8a8e..27ff677 100644 --- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java +++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java @@ -44,40 +44,47 @@ public class FluxParser { // TODO refactor input stream processing (see parseResource() method). public static TopologyDef parseFile(String inputFile, boolean dumpYaml, boolean processIncludes, String propertiesFile, boolean envSub) throws IOException { - Yaml yaml = yaml(); + FileInputStream in = new FileInputStream(inputFile); - // TODO process properties, etc. - TopologyDef topology = loadYaml(yaml, in, propertiesFile, envSub); + TopologyDef topology = parseInputStream(in, dumpYaml, processIncludes, propertiesFile, envSub); in.close(); - if(dumpYaml){ - dumpYaml(topology, yaml); - } - if(processIncludes) { - return processIncludes(yaml, topology, propertiesFile, envSub); - } else { - return topology; - } + + return topology; } public static TopologyDef parseResource(String resource, boolean dumpYaml, boolean processIncludes, String propertiesFile, boolean envSub) throws IOException { - Yaml yaml = yaml(); + InputStream in = FluxParser.class.getResourceAsStream(resource); - if(in == null){ - LOG.error("Unable to load classpath resource: " + resource); - System.exit(1); - } - TopologyDef topology = loadYaml(yaml, in, propertiesFile, envSub); + TopologyDef topology = parseInputStream(in, dumpYaml, processIncludes, propertiesFile, envSub); in.close(); - if(dumpYaml){ - dumpYaml(topology, yaml); - } - if(processIncludes) { - return processIncludes(yaml, topology, propertiesFile, envSub); - } else { - return topology; - } + + return topology; } + + public static TopologyDef parseInputStream(InputStream inputStream, boolean dumpYaml, boolean processIncludes, + String propertiesFile, boolean envSub) throws IOException { + + Yaml yaml = yaml(); + + if (inputStream == null) { + LOG.error("Unable to load input stream"); + System.exit(1); + } + + TopologyDef topology = loadYaml(yaml, inputStream, propertiesFile, envSub); + inputStream.close(); + + if (dumpYaml) { + dumpYaml(topology, yaml); + } + + if (processIncludes) { + return processIncludes(yaml, topology, propertiesFile, envSub); + } else { + return topology; + } + } private static TopologyDef loadYaml(Yaml yaml, InputStream in, String propsFile, boolean envSubstitution) throws IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream();