Repository: storm Updated Branches: refs/heads/master a55bbbea8 -> 285d943b8
preliminary implementation for variable substitution Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5b1d10ab Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5b1d10ab Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5b1d10ab Branch: refs/heads/master Commit: 5b1d10aba4118d854ca06840100b1e8da98bea34 Parents: adf91d9 Author: P. Taylor Goetz <[email protected]> Authored: Mon Mar 30 01:40:32 2015 -0400 Committer: P. Taylor Goetz <[email protected]> Committed: Mon Mar 30 01:40:32 2015 -0400 ---------------------------------------------------------------------- .../main/java/org/apache/storm/flux/Flux.java | 18 +++++++- .../apache/storm/flux/parser/FluxParser.java | 45 +++++++++++++++----- .../java/org/apache/storm/flux/TCKTest.java | 28 ++++++------ .../src/test/resources/configs/hdfs_test.yaml | 8 ++++ 4 files changed, 72 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/5b1d10ab/flux-core/src/main/java/org/apache/storm/flux/Flux.java ---------------------------------------------------------------------- diff --git a/flux-core/src/main/java/org/apache/storm/flux/Flux.java b/flux-core/src/main/java/org/apache/storm/flux/Flux.java index 4583bae..3910e10 100644 --- a/flux-core/src/main/java/org/apache/storm/flux/Flux.java +++ b/flux-core/src/main/java/org/apache/storm/flux/Flux.java @@ -31,6 +31,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.*; +import java.util.Map; +import java.util.Properties; /** * Flux entry point. @@ -52,6 +54,7 @@ public class Flux { private static final String OPTION_NO_SPLASH = "no-splash"; private static final String OPTION_INACTIVE = "inactive"; private static final String OPTION_ZOOKEEPER = "zookeeper"; + private static final String OPTION_FILTER = "filter"; public static void main(String[] args) throws Exception { Options options = new Options(); @@ -77,6 +80,9 @@ public class Flux { options.addOption(option(1, "z", OPTION_ZOOKEEPER, "host:port", "When running in local mode, use the ZooKeeper at the " + "specified <host>:<port> instead of the in-process ZooKeeper.")); + options.addOption(option(1, "f", OPTION_FILTER, "file", "Use the specified file as a source of properties, and " + + "perform variable substitution.")); + CommandLineParser parser = new BasicParser(); CommandLine cmd = parser.parse(options, args); @@ -116,13 +122,21 @@ public class Flux { TopologyDef topologyDef = null; String filePath = (String)cmd.getArgList().get(0); + + // TODO conditionally load properties from a file our resource + Properties props = new Properties(); + if(cmd.hasOption(OPTION_FILTER)){ +// InputStream in = new FileInputStream(); + props.load((InputStream)null); + } + if(cmd.hasOption(OPTION_RESOURCE)){ printf("Parsing classpath resource: %s", filePath); - topologyDef = FluxParser.parseResource(filePath, dumpYaml, true); + topologyDef = FluxParser.parseResource(filePath, dumpYaml, true, props); } else { printf("Parsing file: %s", new File(filePath).getAbsolutePath()); - topologyDef = FluxParser.parseFile(filePath, dumpYaml, true); + topologyDef = FluxParser.parseFile(filePath, dumpYaml, true, props); } http://git-wip-us.apache.org/repos/asf/storm/blob/5b1d10ab/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java ---------------------------------------------------------------------- diff --git a/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java b/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java index baeb4c4..40cfe57 100644 --- a/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java +++ b/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java @@ -17,6 +17,7 @@ */ package org.apache.storm.flux.parser; +import org.apache.storm.flux.api.TopologySource; import org.apache.storm.flux.model.BoltDef; import org.apache.storm.flux.model.IncludeDef; import org.apache.storm.flux.model.SpoutDef; @@ -27,48 +28,70 @@ import org.yaml.snakeyaml.TypeDescription; import org.yaml.snakeyaml.Yaml; import org.yaml.snakeyaml.constructor.Constructor; +import java.io.ByteArrayOutputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; import java.util.Map; +import java.util.Properties; public class FluxParser { private static final Logger LOG = LoggerFactory.getLogger(FluxParser.class); private FluxParser(){} - public static TopologyDef parseFile(String inputFile, boolean dumpYaml, boolean processIncludes) throws IOException { + // TODO refactor input stream processing (see parseResource() method). + public static TopologyDef parseFile(String inputFile, boolean dumpYaml, boolean processIncludes, + Properties properties) throws IOException { Yaml yaml = yaml(); FileInputStream in = new FileInputStream(inputFile); - TopologyDef topology = loadYaml(yaml, in); + // TODO process properties, etc. + TopologyDef topology = loadYaml(yaml, in, properties); in.close(); if(dumpYaml){ dumpYaml(topology, yaml); } if(processIncludes) { - return processIncludes(yaml, topology); + return processIncludes(yaml, topology, properties); } else { return topology; } } - public static TopologyDef parseResource(String resource, boolean dumpYaml, boolean processIncludes) throws IOException { + public static TopologyDef parseResource(String resource, boolean dumpYaml, boolean processIncludes, + Properties properties) throws IOException { Yaml yaml = yaml(); InputStream in = FluxParser.class.getResourceAsStream(resource); - TopologyDef topology = loadYaml(yaml, in); + TopologyDef topology = loadYaml(yaml, in, properties); in.close(); if(dumpYaml){ dumpYaml(topology, yaml); } if(processIncludes) { - return processIncludes(yaml, topology); + return processIncludes(yaml, topology, properties); } else { return topology; } } - private static TopologyDef loadYaml(Yaml yaml, InputStream in){ - return (TopologyDef)yaml.load(in); + private static TopologyDef loadYaml(Yaml yaml, InputStream in, Properties properties) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + LOG.info("loading YAML from input stream..."); + int b = -1; + while((b = in.read()) != -1){ + bos.write(b); + } + String str = bos.toString(); + if(properties != null){ + LOG.info("Performing property substitution."); + for(Object key : properties.keySet()){ + str = str.replace("${" + key + "}", (String)properties.get(key)); + } + } else { + LOG.info("Not performing property substitution."); + } + return (TopologyDef)yaml.load(str); } private static void dumpYaml(TopologyDef topology, Yaml yaml){ @@ -94,17 +117,17 @@ public class FluxParser { * @param topologyDef the topology definition containing (possibly zero) includes * @return The TopologyDef with includes resolved. */ - private static TopologyDef processIncludes(Yaml yaml, TopologyDef topologyDef) throws IOException { + private static TopologyDef processIncludes(Yaml yaml, TopologyDef topologyDef, Properties props) throws IOException { //TODO support multiple levels of includes if(topologyDef.getIncludes() != null) { for (IncludeDef include : topologyDef.getIncludes()){ TopologyDef includeTopologyDef = null; if (include.isResource()) { LOG.info("Loading includes from resource: {}", include.getFile()); - includeTopologyDef = parseResource(include.getFile(), true, false); + includeTopologyDef = parseResource(include.getFile(), true, false, props); } else { LOG.info("Loading includes from file: {}", include.getFile()); - includeTopologyDef = parseFile(include.getFile(), true, false); + includeTopologyDef = parseFile(include.getFile(), true, false, props); } // if overrides are disabled, we won't replace anything that already exists http://git-wip-us.apache.org/repos/asf/storm/blob/5b1d10ab/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java ---------------------------------------------------------------------- diff --git a/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java b/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java index e07dbd0..f84130b 100644 --- a/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java +++ b/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java @@ -32,7 +32,7 @@ import static org.junit.Assert.*; public class TCKTest { @Test public void testTCK() throws Exception { - TopologyDef topologyDef = FluxParser.parseResource("/configs/tck.yaml", false, true); + TopologyDef topologyDef = FluxParser.parseResource("/configs/tck.yaml", false, true, null); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); StormTopology topology = FluxBuilder.buildTopology(context); @@ -42,7 +42,7 @@ public class TCKTest { @Test public void testShellComponents() throws Exception { - TopologyDef topologyDef = FluxParser.parseResource("/configs/shell_test.yaml", false, true); + TopologyDef topologyDef = FluxParser.parseResource("/configs/shell_test.yaml", false, true, null); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); StormTopology topology = FluxBuilder.buildTopology(context); @@ -52,7 +52,7 @@ public class TCKTest { @Test public void testKafkaSpoutConfig() throws Exception { - TopologyDef topologyDef = FluxParser.parseResource("/configs/kafka_test.yaml", false, true); + TopologyDef topologyDef = FluxParser.parseResource("/configs/kafka_test.yaml", false, true, null); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); StormTopology topology = FluxBuilder.buildTopology(context); @@ -62,7 +62,7 @@ public class TCKTest { @Test public void testLoadFromResource() throws Exception { - TopologyDef topologyDef = FluxParser.parseResource("/configs/kafka_test.yaml", false, true); + TopologyDef topologyDef = FluxParser.parseResource("/configs/kafka_test.yaml", false, true, null); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); StormTopology topology = FluxBuilder.buildTopology(context); @@ -73,7 +73,7 @@ public class TCKTest { @Test public void testHdfs() throws Exception { - TopologyDef topologyDef = FluxParser.parseResource("/configs/hdfs_test.yaml", false, true); + TopologyDef topologyDef = FluxParser.parseResource("/configs/hdfs_test.yaml", false, true, null); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); StormTopology topology = FluxBuilder.buildTopology(context); @@ -83,7 +83,7 @@ public class TCKTest { @Test public void testIncludes() throws Exception { - TopologyDef topologyDef = FluxParser.parseResource("/configs/include_test.yaml", false, true); + TopologyDef topologyDef = FluxParser.parseResource("/configs/include_test.yaml", false, true, null); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); StormTopology topology = FluxBuilder.buildTopology(context); @@ -96,7 +96,7 @@ public class TCKTest { @Test public void testTopologySource() throws Exception { - TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology.yaml", false, true); + TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology.yaml", false, true, null); assertTrue(topologyDef.validate()); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); @@ -107,7 +107,7 @@ public class TCKTest { @Test public void testTopologySourceWithReflection() throws Exception { - TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection.yaml", false, true); + TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection.yaml", false, true, null); assertTrue(topologyDef.validate()); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); @@ -118,7 +118,7 @@ public class TCKTest { @Test public void testTopologySourceWithConfigParam() throws Exception { - TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection-config.yaml", false, true); + TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection-config.yaml", false, true, null); assertTrue(topologyDef.validate()); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); @@ -129,7 +129,7 @@ public class TCKTest { @Test public void testTopologySourceWithMethodName() throws Exception { - TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-method-override.yaml", false, true); + TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-method-override.yaml", false, true, null); assertTrue(topologyDef.validate()); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); @@ -141,7 +141,7 @@ public class TCKTest { @Test public void testTridentTopologySource() throws Exception { - TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-trident.yaml", false, true); + TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-trident.yaml", false, true, null); assertTrue(topologyDef.validate()); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); @@ -152,7 +152,7 @@ public class TCKTest { @Test(expected = IllegalArgumentException.class) public void testInvalidTopologySource() throws Exception { - TopologyDef topologyDef = FluxParser.parseResource("/configs/invalid-existing-topology.yaml", false, true); + TopologyDef topologyDef = FluxParser.parseResource("/configs/invalid-existing-topology.yaml", false, true, null); assertFalse("Topology config is invalid.", topologyDef.validate()); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); @@ -162,7 +162,7 @@ public class TCKTest { @Test public void testTopologySourceWithGetMethodName() throws Exception { - TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection.yaml", false, true); + TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection.yaml", false, true, null); assertTrue(topologyDef.validate()); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); @@ -173,7 +173,7 @@ public class TCKTest { @Test public void testTopologySourceWithConfigMethods() throws Exception { - TopologyDef topologyDef = FluxParser.parseResource("/configs/config-methods-test.yaml", false, true); + TopologyDef topologyDef = FluxParser.parseResource("/configs/config-methods-test.yaml", false, true, null); assertTrue(topologyDef.validate()); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); http://git-wip-us.apache.org/repos/asf/storm/blob/5b1d10ab/flux-core/src/test/resources/configs/hdfs_test.yaml ---------------------------------------------------------------------- diff --git a/flux-core/src/test/resources/configs/hdfs_test.yaml b/flux-core/src/test/resources/configs/hdfs_test.yaml index 63441ab..c1d28d2 100644 --- a/flux-core/src/test/resources/configs/hdfs_test.yaml +++ b/flux-core/src/test/resources/configs/hdfs_test.yaml @@ -51,6 +51,12 @@ components: - name: "withFieldDelimiter" args: ["|"] + - id: "rotationAction" + className: "org.apache.storm.hdfs.common.rotation.MoveFileAction" + configMethods: + - name: "toDestination" + args: ["/tmp/dest2"] + # spout definitions spouts: - id: "spout-1" @@ -84,6 +90,8 @@ bolts: args: [ref: "rotationPolicy"] - name: "withSyncPolicy" args: [ref: "syncPolicy"] + - name: "addRotationAction" + args: [ref: "rotationAction"] parallelism: 1 # ...
