implement variable expansion/substitution

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/79b405a3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/79b405a3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/79b405a3

Branch: refs/heads/master
Commit: 79b405a3225a78a293eda279a8d4dccd2cba6230
Parents: 5b1d10a
Author: P. Taylor Goetz <[email protected]>
Authored: Mon Mar 30 15:32:14 2015 -0400
Committer: P. Taylor Goetz <[email protected]>
Committed: Mon Mar 30 15:32:14 2015 -0400

----------------------------------------------------------------------
 .../main/java/org/apache/storm/flux/Flux.java   |   9 +-
 .../apache/storm/flux/parser/FluxParser.java    |  29 +++---
 .../java/org/apache/storm/flux/TCKTest.java     |  13 +++
 .../resources/configs/substitution-test.yaml    | 104 +++++++++++++++++++
 .../src/test/resources/configs/test.properties  |   2 +
 .../flux/wrappers/bolts/FluxShellBolt.java      |   3 +-
 .../flux/wrappers/spouts/FluxShellSpout.java    |   3 +-
 7 files changed, 143 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/79b405a3/flux-core/src/main/java/org/apache/storm/flux/Flux.java
----------------------------------------------------------------------
diff --git a/flux-core/src/main/java/org/apache/storm/flux/Flux.java 
b/flux-core/src/main/java/org/apache/storm/flux/Flux.java
index 3910e10..dcd3953 100644
--- a/flux-core/src/main/java/org/apache/storm/flux/Flux.java
+++ b/flux-core/src/main/java/org/apache/storm/flux/Flux.java
@@ -124,19 +124,18 @@ public class Flux {
         String filePath = (String)cmd.getArgList().get(0);
 
         // TODO conditionally load properties from a file our resource
-        Properties props = new Properties();
+        String filterProps = null;
         if(cmd.hasOption(OPTION_FILTER)){
-//            InputStream in = new FileInputStream();
-            props.load((InputStream)null);
+            filterProps = cmd.getOptionValue(OPTION_FILTER);
         }
 
         if(cmd.hasOption(OPTION_RESOURCE)){
             printf("Parsing classpath resource: %s", filePath);
-            topologyDef = FluxParser.parseResource(filePath, dumpYaml, true, 
props);
+            topologyDef = FluxParser.parseResource(filePath, dumpYaml, true, 
filterProps);
         } else {
             printf("Parsing file: %s",
                     new File(filePath).getAbsolutePath());
-            topologyDef = FluxParser.parseFile(filePath, dumpYaml, true, 
props);
+            topologyDef = FluxParser.parseFile(filePath, dumpYaml, true, 
filterProps);
         }
 
 

http://git-wip-us.apache.org/repos/asf/storm/blob/79b405a3/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
----------------------------------------------------------------------
diff --git 
a/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java 
b/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
index 40cfe57..109330d 100644
--- a/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
+++ b/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
@@ -43,39 +43,39 @@ public class FluxParser {
 
     // TODO refactor input stream processing (see parseResource() method).
     public static TopologyDef parseFile(String inputFile, boolean dumpYaml, 
boolean processIncludes,
-                                        Properties properties) throws 
IOException {
+                                        String propertiesFile) throws 
IOException {
         Yaml yaml = yaml();
         FileInputStream in = new FileInputStream(inputFile);
         // TODO process properties, etc.
-        TopologyDef topology = loadYaml(yaml, in, properties);
+        TopologyDef topology = loadYaml(yaml, in, propertiesFile);
         in.close();
         if(dumpYaml){
             dumpYaml(topology, yaml);
         }
         if(processIncludes) {
-            return processIncludes(yaml, topology, properties);
+            return processIncludes(yaml, topology, propertiesFile);
         } else {
             return topology;
         }
     }
 
     public static TopologyDef parseResource(String resource, boolean dumpYaml, 
boolean processIncludes,
-                                            Properties properties) throws 
IOException {
+                                            String propertiesFile) throws 
IOException {
         Yaml yaml = yaml();
         InputStream in = FluxParser.class.getResourceAsStream(resource);
-        TopologyDef topology = loadYaml(yaml, in, properties);
+        TopologyDef topology = loadYaml(yaml, in, propertiesFile);
         in.close();
         if(dumpYaml){
             dumpYaml(topology, yaml);
         }
         if(processIncludes) {
-            return processIncludes(yaml, topology, properties);
+            return processIncludes(yaml, topology, propertiesFile);
         } else {
             return topology;
         }
     }
 
-    private static TopologyDef loadYaml(Yaml yaml, InputStream in, Properties 
properties) throws IOException {
+    private static TopologyDef loadYaml(Yaml yaml, InputStream in, String 
propsFile) throws IOException {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         LOG.info("loading YAML from input stream...");
         int b = -1;
@@ -83,10 +83,13 @@ public class FluxParser {
             bos.write(b);
         }
         String str = bos.toString();
-        if(properties != null){
+        if(propsFile != null){
             LOG.info("Performing property substitution.");
-            for(Object key : properties.keySet()){
-                str = str.replace("${" + key + "}", 
(String)properties.get(key));
+            InputStream propsIn = new FileInputStream(propsFile);
+            Properties props = new Properties();
+            props.load(propsIn);
+            for(Object key : props.keySet()){
+                str = str.replace("${" + key + "}", 
props.getProperty((String)key));
             }
         } else {
             LOG.info("Not performing property substitution.");
@@ -117,17 +120,17 @@ public class FluxParser {
      * @param topologyDef the topology definition containing (possibly zero) 
includes
      * @return The TopologyDef with includes resolved.
      */
-    private static TopologyDef processIncludes(Yaml yaml, TopologyDef 
topologyDef, Properties props) throws IOException {
+    private static TopologyDef processIncludes(Yaml yaml, TopologyDef 
topologyDef, String propsFile) throws IOException {
         //TODO support multiple levels of includes
         if(topologyDef.getIncludes() != null) {
             for (IncludeDef include : topologyDef.getIncludes()){
                 TopologyDef includeTopologyDef = null;
                 if (include.isResource()) {
                     LOG.info("Loading includes from resource: {}", 
include.getFile());
-                    includeTopologyDef = parseResource(include.getFile(), 
true, false, props);
+                    includeTopologyDef = parseResource(include.getFile(), 
true, false, propsFile);
                 } else {
                     LOG.info("Loading includes from file: {}", 
include.getFile());
-                    includeTopologyDef = parseFile(include.getFile(), true, 
false, props);
+                    includeTopologyDef = parseFile(include.getFile(), true, 
false, propsFile);
                 }
 
                 // if overrides are disabled, we won't replace anything that 
already exists

http://git-wip-us.apache.org/repos/asf/storm/blob/79b405a3/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
----------------------------------------------------------------------
diff --git a/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java 
b/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
index f84130b..f6076cc 100644
--- a/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
+++ b/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
@@ -187,4 +187,17 @@ public class TCKTest {
         assertTrue(bolt.getBar().equals("bar"));
         assertTrue(bolt.getFooBar().equals("foobar"));
     }
+
+    @Test
+    public void testVariableSubstitution() throws Exception {
+        TopologyDef topologyDef = 
FluxParser.parseResource("/configs/substitution-test.yaml", false, true, 
"src/test/resources/configs/test.properties");
+        assertTrue(topologyDef.validate());
+        Config conf = FluxBuilder.buildConfig(topologyDef);
+        ExecutionContext context = new ExecutionContext(topologyDef, conf);
+        StormTopology topology = FluxBuilder.buildTopology(context);
+        assertNotNull(topology);
+        topology.validate();
+
+        
assertTrue(context.getTopologyDef().getName().equals("substitution-topology"));
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/79b405a3/flux-core/src/test/resources/configs/substitution-test.yaml
----------------------------------------------------------------------
diff --git a/flux-core/src/test/resources/configs/substitution-test.yaml 
b/flux-core/src/test/resources/configs/substitution-test.yaml
new file mode 100644
index 0000000..cbfeea4
--- /dev/null
+++ b/flux-core/src/test/resources/configs/substitution-test.yaml
@@ -0,0 +1,104 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Test ability to wire together shell spouts/bolts
+---
+
+# topology definition
+# name to be used when submitting
+name: "${topology.name}"
+
+# Components
+# Components are analagous to Spring beans. They are meant to be used as 
constructor,
+# property(setter), and builder arguments.
+#components:
+#  - id: "myComponent"
+#    className: "com.foo.bar.MyComponent"
+#    constructorArgs:
+#      - ...
+#    properties:
+#      foo: "bar"
+#      bar: "foo"
+
+# NOTE: We may want to consider some level of spring integration. For example, 
allowing component references
+# to a spring `ApplicationContext`.
+
+# topology configuration
+# this will be passed to the submitter as a map of config options
+#
+config:
+  topology.workers: 1
+  # ...
+
+# spout definitions
+spouts:
+  - id: "sentence-spout"
+    className: "org.apache.storm.flux.wrappers.spouts.FluxShellSpout"
+    # shell spout constructor takes 2 arguments: String[], String[]
+    constructorArgs:
+      # command line
+      - ["node", "randomsentence.js"]
+      # output fields
+      - ["word"]
+    parallelism: 1
+    # ...
+
+# bolt definitions
+bolts:
+  - id: "splitsentence"
+    className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt"
+    constructorArgs:
+      # command line
+      - ["python", "splitsentence.py"]
+      # output fields
+      - ["word"]
+    parallelism: 1
+    # ...
+
+  - id: "log"
+    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+    parallelism: 1
+    # ...
+
+  - id: "count"
+    className: "backtype.storm.testing.TestWordCounter"
+    parallelism: 1
+    # ...
+
+#stream definitions
+# stream definitions define connections between spouts and bolts.
+# note that such connections can be cyclical
+# custom stream groupings are also supported
+
+streams:
+  - name: "spout --> split" # name isn't used (placeholder for logging, UI, 
etc.)
+    from: "sentence-spout"
+    to: "splitsentence"
+    grouping:
+      type: SHUFFLE
+
+  - name: "split --> count"
+    from: "splitsentence"
+    to: "count"
+    grouping:
+      type: FIELDS
+      args: ["word"]
+
+  - name: "count --> log"
+    from: "count"
+    to: "log"
+    grouping:
+      type: SHUFFLE

http://git-wip-us.apache.org/repos/asf/storm/blob/79b405a3/flux-core/src/test/resources/configs/test.properties
----------------------------------------------------------------------
diff --git a/flux-core/src/test/resources/configs/test.properties 
b/flux-core/src/test/resources/configs/test.properties
new file mode 100644
index 0000000..0730d5f
--- /dev/null
+++ b/flux-core/src/test/resources/configs/test.properties
@@ -0,0 +1,2 @@
+topology.name: substitution-topology
+some.other.property: foo bar
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/79b405a3/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/FluxShellBolt.java
----------------------------------------------------------------------
diff --git 
a/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/FluxShellBolt.java
 
b/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/FluxShellBolt.java
index d88b18a..4e0f91c 100644
--- 
a/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/FluxShellBolt.java
+++ 
b/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/FluxShellBolt.java
@@ -31,6 +31,7 @@ import java.util.Map;
  */
 public class FluxShellBolt extends ShellBolt implements IRichBolt{
     private String[] outputFields;
+    private Map<String, Object> componentConfig;
 
     /**
      * Create a ShellBolt with command line arguments and output fields
@@ -50,6 +51,6 @@ public class FluxShellBolt extends ShellBolt implements 
IRichBolt{
 
     @Override
     public Map<String, Object> getComponentConfiguration() {
-        return null;
+        return this.componentConfig;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/79b405a3/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/spouts/FluxShellSpout.java
----------------------------------------------------------------------
diff --git 
a/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/spouts/FluxShellSpout.java
 
b/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/spouts/FluxShellSpout.java
index bfdfbf2..c7e9058 100644
--- 
a/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/spouts/FluxShellSpout.java
+++ 
b/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/spouts/FluxShellSpout.java
@@ -31,6 +31,7 @@ import java.util.Map;
  */
 public class FluxShellSpout extends ShellSpout implements IRichSpout {
     private String[] outputFields;
+    private Map<String, Object> componentConfig;
 
     /**
      * Create a ShellSpout with command line arguments and output fields
@@ -49,6 +50,6 @@ public class FluxShellSpout extends ShellSpout implements 
IRichSpout {
 
     @Override
     public Map<String, Object> getComponentConfiguration() {
-        return null;
+        return this.componentConfig;
     }
 }

Reply via email to