Repository: storm
Updated Branches:
  refs/heads/STORM-942-cherrypick [created] 9c3dfc986


Added parseInputStream() method

Included reusable parseInputStream for use by parseFile and parseResource. Also 
allows a path for programatically creating topologies without the need to write 
to / read from disk.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/11823a72
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/11823a72
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/11823a72

Branch: refs/heads/STORM-942-cherrypick
Commit: 11823a722749ba77c4b207ad5d369df69fd2551b
Parents: e215767
Author: Brendan W. Lyon <bl...@datainterfuse.com>
Authored: Thu Jul 16 13:48:00 2015 -0400
Committer: Jungtaek Lim <kabh...@gmail.com>
Committed: Fri Jul 17 06:57:28 2015 +0900

----------------------------------------------------------------------
 .../apache/storm/flux/parser/FluxParser.java    | 57 +++++++++++---------
 1 file changed, 32 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/11823a72/external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
----------------------------------------------------------------------
diff --git 
a/external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
 
b/external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
index 72f8a8e..27ff677 100644
--- 
a/external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
+++ 
b/external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
@@ -44,40 +44,47 @@ public class FluxParser {
     // TODO refactor input stream processing (see parseResource() method).
     public static TopologyDef parseFile(String inputFile, boolean dumpYaml, 
boolean processIncludes,
                                         String propertiesFile, boolean envSub) 
throws IOException {
-        Yaml yaml = yaml();
+   
         FileInputStream in = new FileInputStream(inputFile);
-        // TODO process properties, etc.
-        TopologyDef topology = loadYaml(yaml, in, propertiesFile, envSub);
+        TopologyDef topology = parseInputStream(in, dumpYaml, processIncludes, 
propertiesFile, envSub);
         in.close();
-        if(dumpYaml){
-            dumpYaml(topology, yaml);
-        }
-        if(processIncludes) {
-            return processIncludes(yaml, topology, propertiesFile, envSub);
-        } else {
-            return topology;
-        }
+        
+        return topology;
     }
 
     public static TopologyDef parseResource(String resource, boolean dumpYaml, 
boolean processIncludes,
                                             String propertiesFile, boolean 
envSub) throws IOException {
-        Yaml yaml = yaml();
+        
         InputStream in = FluxParser.class.getResourceAsStream(resource);
-        if(in == null){
-            LOG.error("Unable to load classpath resource: " + resource);
-            System.exit(1);
-        }
-        TopologyDef topology = loadYaml(yaml, in, propertiesFile, envSub);
+        TopologyDef topology = parseInputStream(in, dumpYaml, processIncludes, 
propertiesFile, envSub);
         in.close();
-        if(dumpYaml){
-            dumpYaml(topology, yaml);
-        }
-        if(processIncludes) {
-            return processIncludes(yaml, topology, propertiesFile, envSub);
-        } else {
-            return topology;
-        }
+        
+        return topology;
     }
+    
+    public static TopologyDef parseInputStream(InputStream inputStream, 
boolean dumpYaml, boolean processIncludes,
+            String propertiesFile, boolean envSub) throws IOException {
+               
+       Yaml yaml = yaml();
+       
+               if (inputStream == null) {
+                       LOG.error("Unable to load input stream");
+                       System.exit(1);
+               }
+               
+               TopologyDef topology = loadYaml(yaml, inputStream, 
propertiesFile, envSub);
+               inputStream.close();
+               
+               if (dumpYaml) {
+                       dumpYaml(topology, yaml);
+               }
+               
+               if (processIncludes) {
+                       return processIncludes(yaml, topology, propertiesFile, 
envSub);
+               } else {
+                       return topology;
+               }
+       }
 
     private static TopologyDef loadYaml(Yaml yaml, InputStream in, String 
propsFile, boolean envSubstitution) throws IOException {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();

Reply via email to