implement variable expansion/substitution
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/79b405a3 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/79b405a3 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/79b405a3 Branch: refs/heads/master Commit: 79b405a3225a78a293eda279a8d4dccd2cba6230 Parents: 5b1d10a Author: P. Taylor Goetz <[email protected]> Authored: Mon Mar 30 15:32:14 2015 -0400 Committer: P. Taylor Goetz <[email protected]> Committed: Mon Mar 30 15:32:14 2015 -0400 ---------------------------------------------------------------------- .../main/java/org/apache/storm/flux/Flux.java | 9 +- .../apache/storm/flux/parser/FluxParser.java | 29 +++--- .../java/org/apache/storm/flux/TCKTest.java | 13 +++ .../resources/configs/substitution-test.yaml | 104 +++++++++++++++++++ .../src/test/resources/configs/test.properties | 2 + .../flux/wrappers/bolts/FluxShellBolt.java | 3 +- .../flux/wrappers/spouts/FluxShellSpout.java | 3 +- 7 files changed, 143 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/79b405a3/flux-core/src/main/java/org/apache/storm/flux/Flux.java ---------------------------------------------------------------------- diff --git a/flux-core/src/main/java/org/apache/storm/flux/Flux.java b/flux-core/src/main/java/org/apache/storm/flux/Flux.java index 3910e10..dcd3953 100644 --- a/flux-core/src/main/java/org/apache/storm/flux/Flux.java +++ b/flux-core/src/main/java/org/apache/storm/flux/Flux.java @@ -124,19 +124,18 @@ public class Flux { String filePath = (String)cmd.getArgList().get(0); // TODO conditionally load properties from a file our resource - Properties props = new Properties(); + String filterProps = null; if(cmd.hasOption(OPTION_FILTER)){ -// InputStream in = new FileInputStream(); - props.load((InputStream)null); + filterProps = cmd.getOptionValue(OPTION_FILTER); } if(cmd.hasOption(OPTION_RESOURCE)){ printf("Parsing classpath resource: %s", filePath); - topologyDef = FluxParser.parseResource(filePath, dumpYaml, true, props); + topologyDef = FluxParser.parseResource(filePath, dumpYaml, true, filterProps); } else { printf("Parsing file: %s", new File(filePath).getAbsolutePath()); - topologyDef = FluxParser.parseFile(filePath, dumpYaml, true, props); + topologyDef = FluxParser.parseFile(filePath, dumpYaml, true, filterProps); } http://git-wip-us.apache.org/repos/asf/storm/blob/79b405a3/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java ---------------------------------------------------------------------- diff --git a/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java b/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java index 40cfe57..109330d 100644 --- a/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java +++ b/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java @@ -43,39 +43,39 @@ public class FluxParser { // TODO refactor input stream processing (see parseResource() method). public static TopologyDef parseFile(String inputFile, boolean dumpYaml, boolean processIncludes, - Properties properties) throws IOException { + String propertiesFile) throws IOException { Yaml yaml = yaml(); FileInputStream in = new FileInputStream(inputFile); // TODO process properties, etc. - TopologyDef topology = loadYaml(yaml, in, properties); + TopologyDef topology = loadYaml(yaml, in, propertiesFile); in.close(); if(dumpYaml){ dumpYaml(topology, yaml); } if(processIncludes) { - return processIncludes(yaml, topology, properties); + return processIncludes(yaml, topology, propertiesFile); } else { return topology; } } public static TopologyDef parseResource(String resource, boolean dumpYaml, boolean processIncludes, - Properties properties) throws IOException { + String propertiesFile) throws IOException { Yaml yaml = yaml(); InputStream in = FluxParser.class.getResourceAsStream(resource); - TopologyDef topology = loadYaml(yaml, in, properties); + TopologyDef topology = loadYaml(yaml, in, propertiesFile); in.close(); if(dumpYaml){ dumpYaml(topology, yaml); } if(processIncludes) { - return processIncludes(yaml, topology, properties); + return processIncludes(yaml, topology, propertiesFile); } else { return topology; } } - private static TopologyDef loadYaml(Yaml yaml, InputStream in, Properties properties) throws IOException { + private static TopologyDef loadYaml(Yaml yaml, InputStream in, String propsFile) throws IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream(); LOG.info("loading YAML from input stream..."); int b = -1; @@ -83,10 +83,13 @@ public class FluxParser { bos.write(b); } String str = bos.toString(); - if(properties != null){ + if(propsFile != null){ LOG.info("Performing property substitution."); - for(Object key : properties.keySet()){ - str = str.replace("${" + key + "}", (String)properties.get(key)); + InputStream propsIn = new FileInputStream(propsFile); + Properties props = new Properties(); + props.load(propsIn); + for(Object key : props.keySet()){ + str = str.replace("${" + key + "}", props.getProperty((String)key)); } } else { LOG.info("Not performing property substitution."); @@ -117,17 +120,17 @@ public class FluxParser { * @param topologyDef the topology definition containing (possibly zero) includes * @return The TopologyDef with includes resolved. */ - private static TopologyDef processIncludes(Yaml yaml, TopologyDef topologyDef, Properties props) throws IOException { + private static TopologyDef processIncludes(Yaml yaml, TopologyDef topologyDef, String propsFile) throws IOException { //TODO support multiple levels of includes if(topologyDef.getIncludes() != null) { for (IncludeDef include : topologyDef.getIncludes()){ TopologyDef includeTopologyDef = null; if (include.isResource()) { LOG.info("Loading includes from resource: {}", include.getFile()); - includeTopologyDef = parseResource(include.getFile(), true, false, props); + includeTopologyDef = parseResource(include.getFile(), true, false, propsFile); } else { LOG.info("Loading includes from file: {}", include.getFile()); - includeTopologyDef = parseFile(include.getFile(), true, false, props); + includeTopologyDef = parseFile(include.getFile(), true, false, propsFile); } // if overrides are disabled, we won't replace anything that already exists http://git-wip-us.apache.org/repos/asf/storm/blob/79b405a3/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java ---------------------------------------------------------------------- diff --git a/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java b/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java index f84130b..f6076cc 100644 --- a/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java +++ b/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java @@ -187,4 +187,17 @@ public class TCKTest { assertTrue(bolt.getBar().equals("bar")); assertTrue(bolt.getFooBar().equals("foobar")); } + + @Test + public void testVariableSubstitution() throws Exception { + TopologyDef topologyDef = FluxParser.parseResource("/configs/substitution-test.yaml", false, true, "src/test/resources/configs/test.properties"); + assertTrue(topologyDef.validate()); + Config conf = FluxBuilder.buildConfig(topologyDef); + ExecutionContext context = new ExecutionContext(topologyDef, conf); + StormTopology topology = FluxBuilder.buildTopology(context); + assertNotNull(topology); + topology.validate(); + + assertTrue(context.getTopologyDef().getName().equals("substitution-topology")); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/79b405a3/flux-core/src/test/resources/configs/substitution-test.yaml ---------------------------------------------------------------------- diff --git a/flux-core/src/test/resources/configs/substitution-test.yaml b/flux-core/src/test/resources/configs/substitution-test.yaml new file mode 100644 index 0000000..cbfeea4 --- /dev/null +++ b/flux-core/src/test/resources/configs/substitution-test.yaml @@ -0,0 +1,104 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Test ability to wire together shell spouts/bolts +--- + +# topology definition +# name to be used when submitting +name: "${topology.name}" + +# Components +# Components are analagous to Spring beans. They are meant to be used as constructor, +# property(setter), and builder arguments. +#components: +# - id: "myComponent" +# className: "com.foo.bar.MyComponent" +# constructorArgs: +# - ... +# properties: +# foo: "bar" +# bar: "foo" + +# NOTE: We may want to consider some level of spring integration. For example, allowing component references +# to a spring `ApplicationContext`. + +# topology configuration +# this will be passed to the submitter as a map of config options +# +config: + topology.workers: 1 + # ... + +# spout definitions +spouts: + - id: "sentence-spout" + className: "org.apache.storm.flux.wrappers.spouts.FluxShellSpout" + # shell spout constructor takes 2 arguments: String[], String[] + constructorArgs: + # command line + - ["node", "randomsentence.js"] + # output fields + - ["word"] + parallelism: 1 + # ... + +# bolt definitions +bolts: + - id: "splitsentence" + className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt" + constructorArgs: + # command line + - ["python", "splitsentence.py"] + # output fields + - ["word"] + parallelism: 1 + # ... + + - id: "log" + className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" + parallelism: 1 + # ... + + - id: "count" + className: "backtype.storm.testing.TestWordCounter" + parallelism: 1 + # ... + +#stream definitions +# stream definitions define connections between spouts and bolts. +# note that such connections can be cyclical +# custom stream groupings are also supported + +streams: + - name: "spout --> split" # name isn't used (placeholder for logging, UI, etc.) + from: "sentence-spout" + to: "splitsentence" + grouping: + type: SHUFFLE + + - name: "split --> count" + from: "splitsentence" + to: "count" + grouping: + type: FIELDS + args: ["word"] + + - name: "count --> log" + from: "count" + to: "log" + grouping: + type: SHUFFLE http://git-wip-us.apache.org/repos/asf/storm/blob/79b405a3/flux-core/src/test/resources/configs/test.properties ---------------------------------------------------------------------- diff --git a/flux-core/src/test/resources/configs/test.properties b/flux-core/src/test/resources/configs/test.properties new file mode 100644 index 0000000..0730d5f --- /dev/null +++ b/flux-core/src/test/resources/configs/test.properties @@ -0,0 +1,2 @@ +topology.name: substitution-topology +some.other.property: foo bar \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/79b405a3/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/FluxShellBolt.java ---------------------------------------------------------------------- diff --git a/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/FluxShellBolt.java b/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/FluxShellBolt.java index d88b18a..4e0f91c 100644 --- a/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/FluxShellBolt.java +++ b/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/FluxShellBolt.java @@ -31,6 +31,7 @@ import java.util.Map; */ public class FluxShellBolt extends ShellBolt implements IRichBolt{ private String[] outputFields; + private Map<String, Object> componentConfig; /** * Create a ShellBolt with command line arguments and output fields @@ -50,6 +51,6 @@ public class FluxShellBolt extends ShellBolt implements IRichBolt{ @Override public Map<String, Object> getComponentConfiguration() { - return null; + return this.componentConfig; } } http://git-wip-us.apache.org/repos/asf/storm/blob/79b405a3/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/spouts/FluxShellSpout.java ---------------------------------------------------------------------- diff --git a/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/spouts/FluxShellSpout.java b/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/spouts/FluxShellSpout.java index bfdfbf2..c7e9058 100644 --- a/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/spouts/FluxShellSpout.java +++ b/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/spouts/FluxShellSpout.java @@ -31,6 +31,7 @@ import java.util.Map; */ public class FluxShellSpout extends ShellSpout implements IRichSpout { private String[] outputFields; + private Map<String, Object> componentConfig; /** * Create a ShellSpout with command line arguments and output fields @@ -49,6 +50,6 @@ public class FluxShellSpout extends ShellSpout implements IRichSpout { @Override public Map<String, Object> getComponentConfiguration() { - return null; + return this.componentConfig; } }
