Repository: storm
Updated Branches:
  refs/heads/master a55bbbea8 -> 285d943b8


preliminary implementation for variable substitution


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5b1d10ab
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5b1d10ab
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5b1d10ab

Branch: refs/heads/master
Commit: 5b1d10aba4118d854ca06840100b1e8da98bea34
Parents: adf91d9
Author: P. Taylor Goetz <[email protected]>
Authored: Mon Mar 30 01:40:32 2015 -0400
Committer: P. Taylor Goetz <[email protected]>
Committed: Mon Mar 30 01:40:32 2015 -0400

----------------------------------------------------------------------
 .../main/java/org/apache/storm/flux/Flux.java   | 18 +++++++-
 .../apache/storm/flux/parser/FluxParser.java    | 45 +++++++++++++++-----
 .../java/org/apache/storm/flux/TCKTest.java     | 28 ++++++------
 .../src/test/resources/configs/hdfs_test.yaml   |  8 ++++
 4 files changed, 72 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5b1d10ab/flux-core/src/main/java/org/apache/storm/flux/Flux.java
----------------------------------------------------------------------
diff --git a/flux-core/src/main/java/org/apache/storm/flux/Flux.java 
b/flux-core/src/main/java/org/apache/storm/flux/Flux.java
index 4583bae..3910e10 100644
--- a/flux-core/src/main/java/org/apache/storm/flux/Flux.java
+++ b/flux-core/src/main/java/org/apache/storm/flux/Flux.java
@@ -31,6 +31,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.*;
+import java.util.Map;
+import java.util.Properties;
 
 /**
  * Flux entry point.
@@ -52,6 +54,7 @@ public class Flux {
     private static final String OPTION_NO_SPLASH = "no-splash";
     private static final String OPTION_INACTIVE = "inactive";
     private static final String OPTION_ZOOKEEPER = "zookeeper";
+    private static final String OPTION_FILTER = "filter";
 
     public static void main(String[] args) throws Exception {
         Options options = new Options();
@@ -77,6 +80,9 @@ public class Flux {
         options.addOption(option(1, "z", OPTION_ZOOKEEPER, "host:port", "When 
running in local mode, use the ZooKeeper at the " +
                 "specified <host>:<port> instead of the in-process 
ZooKeeper."));
 
+        options.addOption(option(1, "f", OPTION_FILTER, "file", "Use the 
specified file as a source of properties, and " +
+                "perform variable substitution."));
+
         CommandLineParser parser = new BasicParser();
         CommandLine cmd = parser.parse(options, args);
 
@@ -116,13 +122,21 @@ public class Flux {
 
         TopologyDef topologyDef = null;
         String filePath = (String)cmd.getArgList().get(0);
+
+        // TODO conditionally load properties from a file our resource
+        Properties props = new Properties();
+        if(cmd.hasOption(OPTION_FILTER)){
+//            InputStream in = new FileInputStream();
+            props.load((InputStream)null);
+        }
+
         if(cmd.hasOption(OPTION_RESOURCE)){
             printf("Parsing classpath resource: %s", filePath);
-            topologyDef = FluxParser.parseResource(filePath, dumpYaml, true);
+            topologyDef = FluxParser.parseResource(filePath, dumpYaml, true, 
props);
         } else {
             printf("Parsing file: %s",
                     new File(filePath).getAbsolutePath());
-            topologyDef = FluxParser.parseFile(filePath, dumpYaml, true);
+            topologyDef = FluxParser.parseFile(filePath, dumpYaml, true, 
props);
         }
 
 

http://git-wip-us.apache.org/repos/asf/storm/blob/5b1d10ab/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
----------------------------------------------------------------------
diff --git 
a/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java 
b/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
index baeb4c4..40cfe57 100644
--- a/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
+++ b/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.flux.parser;
 
+import org.apache.storm.flux.api.TopologySource;
 import org.apache.storm.flux.model.BoltDef;
 import org.apache.storm.flux.model.IncludeDef;
 import org.apache.storm.flux.model.SpoutDef;
@@ -27,48 +28,70 @@ import org.yaml.snakeyaml.TypeDescription;
 import org.yaml.snakeyaml.Yaml;
 import org.yaml.snakeyaml.constructor.Constructor;
 
+import java.io.ByteArrayOutputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.util.Map;
+import java.util.Properties;
 
 public class FluxParser {
     private static final Logger LOG = 
LoggerFactory.getLogger(FluxParser.class);
 
     private FluxParser(){}
 
-    public static TopologyDef parseFile(String inputFile, boolean dumpYaml, 
boolean processIncludes) throws IOException {
+    // TODO refactor input stream processing (see parseResource() method).
+    public static TopologyDef parseFile(String inputFile, boolean dumpYaml, 
boolean processIncludes,
+                                        Properties properties) throws 
IOException {
         Yaml yaml = yaml();
         FileInputStream in = new FileInputStream(inputFile);
-        TopologyDef topology = loadYaml(yaml, in);
+        // TODO process properties, etc.
+        TopologyDef topology = loadYaml(yaml, in, properties);
         in.close();
         if(dumpYaml){
             dumpYaml(topology, yaml);
         }
         if(processIncludes) {
-            return processIncludes(yaml, topology);
+            return processIncludes(yaml, topology, properties);
         } else {
             return topology;
         }
     }
 
-    public static TopologyDef parseResource(String resource, boolean dumpYaml, 
boolean processIncludes) throws IOException {
+    public static TopologyDef parseResource(String resource, boolean dumpYaml, 
boolean processIncludes,
+                                            Properties properties) throws 
IOException {
         Yaml yaml = yaml();
         InputStream in = FluxParser.class.getResourceAsStream(resource);
-        TopologyDef topology = loadYaml(yaml, in);
+        TopologyDef topology = loadYaml(yaml, in, properties);
         in.close();
         if(dumpYaml){
             dumpYaml(topology, yaml);
         }
         if(processIncludes) {
-            return processIncludes(yaml, topology);
+            return processIncludes(yaml, topology, properties);
         } else {
             return topology;
         }
     }
 
-    private static TopologyDef loadYaml(Yaml yaml, InputStream in){
-        return (TopologyDef)yaml.load(in);
+    private static TopologyDef loadYaml(Yaml yaml, InputStream in, Properties 
properties) throws IOException {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        LOG.info("loading YAML from input stream...");
+        int b = -1;
+        while((b = in.read()) != -1){
+            bos.write(b);
+        }
+        String str = bos.toString();
+        if(properties != null){
+            LOG.info("Performing property substitution.");
+            for(Object key : properties.keySet()){
+                str = str.replace("${" + key + "}", 
(String)properties.get(key));
+            }
+        } else {
+            LOG.info("Not performing property substitution.");
+        }
+        return (TopologyDef)yaml.load(str);
     }
 
     private static void dumpYaml(TopologyDef topology, Yaml yaml){
@@ -94,17 +117,17 @@ public class FluxParser {
      * @param topologyDef the topology definition containing (possibly zero) 
includes
      * @return The TopologyDef with includes resolved.
      */
-    private static TopologyDef processIncludes(Yaml yaml, TopologyDef 
topologyDef) throws IOException {
+    private static TopologyDef processIncludes(Yaml yaml, TopologyDef 
topologyDef, Properties props) throws IOException {
         //TODO support multiple levels of includes
         if(topologyDef.getIncludes() != null) {
             for (IncludeDef include : topologyDef.getIncludes()){
                 TopologyDef includeTopologyDef = null;
                 if (include.isResource()) {
                     LOG.info("Loading includes from resource: {}", 
include.getFile());
-                    includeTopologyDef = parseResource(include.getFile(), 
true, false);
+                    includeTopologyDef = parseResource(include.getFile(), 
true, false, props);
                 } else {
                     LOG.info("Loading includes from file: {}", 
include.getFile());
-                    includeTopologyDef = parseFile(include.getFile(), true, 
false);
+                    includeTopologyDef = parseFile(include.getFile(), true, 
false, props);
                 }
 
                 // if overrides are disabled, we won't replace anything that 
already exists

http://git-wip-us.apache.org/repos/asf/storm/blob/5b1d10ab/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
----------------------------------------------------------------------
diff --git a/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java 
b/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
index e07dbd0..f84130b 100644
--- a/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
+++ b/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
@@ -32,7 +32,7 @@ import static org.junit.Assert.*;
 public class TCKTest {
     @Test
     public void testTCK() throws Exception {
-        TopologyDef topologyDef = 
FluxParser.parseResource("/configs/tck.yaml", false, true);
+        TopologyDef topologyDef = 
FluxParser.parseResource("/configs/tck.yaml", false, true, null);
         Config conf = FluxBuilder.buildConfig(topologyDef);
         ExecutionContext context = new ExecutionContext(topologyDef, conf);
         StormTopology topology = FluxBuilder.buildTopology(context);
@@ -42,7 +42,7 @@ public class TCKTest {
 
     @Test
     public void testShellComponents() throws Exception {
-        TopologyDef topologyDef = 
FluxParser.parseResource("/configs/shell_test.yaml", false, true);
+        TopologyDef topologyDef = 
FluxParser.parseResource("/configs/shell_test.yaml", false, true, null);
         Config conf = FluxBuilder.buildConfig(topologyDef);
         ExecutionContext context = new ExecutionContext(topologyDef, conf);
         StormTopology topology = FluxBuilder.buildTopology(context);
@@ -52,7 +52,7 @@ public class TCKTest {
 
     @Test
     public void testKafkaSpoutConfig() throws Exception {
-        TopologyDef topologyDef = 
FluxParser.parseResource("/configs/kafka_test.yaml", false, true);
+        TopologyDef topologyDef = 
FluxParser.parseResource("/configs/kafka_test.yaml", false, true, null);
         Config conf = FluxBuilder.buildConfig(topologyDef);
         ExecutionContext context = new ExecutionContext(topologyDef, conf);
         StormTopology topology = FluxBuilder.buildTopology(context);
@@ -62,7 +62,7 @@ public class TCKTest {
 
     @Test
     public void testLoadFromResource() throws Exception {
-        TopologyDef topologyDef = 
FluxParser.parseResource("/configs/kafka_test.yaml", false, true);
+        TopologyDef topologyDef = 
FluxParser.parseResource("/configs/kafka_test.yaml", false, true, null);
         Config conf = FluxBuilder.buildConfig(topologyDef);
         ExecutionContext context = new ExecutionContext(topologyDef, conf);
         StormTopology topology = FluxBuilder.buildTopology(context);
@@ -73,7 +73,7 @@ public class TCKTest {
 
     @Test
     public void testHdfs() throws Exception {
-        TopologyDef topologyDef = 
FluxParser.parseResource("/configs/hdfs_test.yaml", false, true);
+        TopologyDef topologyDef = 
FluxParser.parseResource("/configs/hdfs_test.yaml", false, true, null);
         Config conf = FluxBuilder.buildConfig(topologyDef);
         ExecutionContext context = new ExecutionContext(topologyDef, conf);
         StormTopology topology = FluxBuilder.buildTopology(context);
@@ -83,7 +83,7 @@ public class TCKTest {
 
     @Test
     public void testIncludes() throws Exception {
-        TopologyDef topologyDef = 
FluxParser.parseResource("/configs/include_test.yaml", false, true);
+        TopologyDef topologyDef = 
FluxParser.parseResource("/configs/include_test.yaml", false, true, null);
         Config conf = FluxBuilder.buildConfig(topologyDef);
         ExecutionContext context = new ExecutionContext(topologyDef, conf);
         StormTopology topology = FluxBuilder.buildTopology(context);
@@ -96,7 +96,7 @@ public class TCKTest {
 
     @Test
     public void testTopologySource() throws Exception {
-        TopologyDef topologyDef = 
FluxParser.parseResource("/configs/existing-topology.yaml", false, true);
+        TopologyDef topologyDef = 
FluxParser.parseResource("/configs/existing-topology.yaml", false, true, null);
         assertTrue(topologyDef.validate());
         Config conf = FluxBuilder.buildConfig(topologyDef);
         ExecutionContext context = new ExecutionContext(topologyDef, conf);
@@ -107,7 +107,7 @@ public class TCKTest {
 
     @Test
     public void testTopologySourceWithReflection() throws Exception {
-        TopologyDef topologyDef = 
FluxParser.parseResource("/configs/existing-topology-reflection.yaml", false, 
true);
+        TopologyDef topologyDef = 
FluxParser.parseResource("/configs/existing-topology-reflection.yaml", false, 
true, null);
         assertTrue(topologyDef.validate());
         Config conf = FluxBuilder.buildConfig(topologyDef);
         ExecutionContext context = new ExecutionContext(topologyDef, conf);
@@ -118,7 +118,7 @@ public class TCKTest {
 
     @Test
     public void testTopologySourceWithConfigParam() throws Exception {
-        TopologyDef topologyDef = 
FluxParser.parseResource("/configs/existing-topology-reflection-config.yaml", 
false, true);
+        TopologyDef topologyDef = 
FluxParser.parseResource("/configs/existing-topology-reflection-config.yaml", 
false, true, null);
         assertTrue(topologyDef.validate());
         Config conf = FluxBuilder.buildConfig(topologyDef);
         ExecutionContext context = new ExecutionContext(topologyDef, conf);
@@ -129,7 +129,7 @@ public class TCKTest {
 
     @Test
     public void testTopologySourceWithMethodName() throws Exception {
-        TopologyDef topologyDef = 
FluxParser.parseResource("/configs/existing-topology-method-override.yaml", 
false, true);
+        TopologyDef topologyDef = 
FluxParser.parseResource("/configs/existing-topology-method-override.yaml", 
false, true, null);
         assertTrue(topologyDef.validate());
         Config conf = FluxBuilder.buildConfig(topologyDef);
         ExecutionContext context = new ExecutionContext(topologyDef, conf);
@@ -141,7 +141,7 @@ public class TCKTest {
 
     @Test
     public void testTridentTopologySource() throws Exception {
-        TopologyDef topologyDef = 
FluxParser.parseResource("/configs/existing-topology-trident.yaml", false, 
true);
+        TopologyDef topologyDef = 
FluxParser.parseResource("/configs/existing-topology-trident.yaml", false, 
true, null);
         assertTrue(topologyDef.validate());
         Config conf = FluxBuilder.buildConfig(topologyDef);
         ExecutionContext context = new ExecutionContext(topologyDef, conf);
@@ -152,7 +152,7 @@ public class TCKTest {
 
     @Test(expected = IllegalArgumentException.class)
     public void testInvalidTopologySource() throws Exception {
-        TopologyDef topologyDef = 
FluxParser.parseResource("/configs/invalid-existing-topology.yaml", false, 
true);
+        TopologyDef topologyDef = 
FluxParser.parseResource("/configs/invalid-existing-topology.yaml", false, 
true, null);
         assertFalse("Topology config is invalid.", topologyDef.validate());
         Config conf = FluxBuilder.buildConfig(topologyDef);
         ExecutionContext context = new ExecutionContext(topologyDef, conf);
@@ -162,7 +162,7 @@ public class TCKTest {
 
     @Test
     public void testTopologySourceWithGetMethodName() throws Exception {
-        TopologyDef topologyDef = 
FluxParser.parseResource("/configs/existing-topology-reflection.yaml", false, 
true);
+        TopologyDef topologyDef = 
FluxParser.parseResource("/configs/existing-topology-reflection.yaml", false, 
true, null);
         assertTrue(topologyDef.validate());
         Config conf = FluxBuilder.buildConfig(topologyDef);
         ExecutionContext context = new ExecutionContext(topologyDef, conf);
@@ -173,7 +173,7 @@ public class TCKTest {
 
     @Test
     public void testTopologySourceWithConfigMethods() throws Exception {
-        TopologyDef topologyDef = 
FluxParser.parseResource("/configs/config-methods-test.yaml", false, true);
+        TopologyDef topologyDef = 
FluxParser.parseResource("/configs/config-methods-test.yaml", false, true, 
null);
         assertTrue(topologyDef.validate());
         Config conf = FluxBuilder.buildConfig(topologyDef);
         ExecutionContext context = new ExecutionContext(topologyDef, conf);

http://git-wip-us.apache.org/repos/asf/storm/blob/5b1d10ab/flux-core/src/test/resources/configs/hdfs_test.yaml
----------------------------------------------------------------------
diff --git a/flux-core/src/test/resources/configs/hdfs_test.yaml 
b/flux-core/src/test/resources/configs/hdfs_test.yaml
index 63441ab..c1d28d2 100644
--- a/flux-core/src/test/resources/configs/hdfs_test.yaml
+++ b/flux-core/src/test/resources/configs/hdfs_test.yaml
@@ -51,6 +51,12 @@ components:
       - name: "withFieldDelimiter"
         args: ["|"]
 
+  - id: "rotationAction"
+    className: "org.apache.storm.hdfs.common.rotation.MoveFileAction"
+    configMethods:
+      - name: "toDestination"
+        args: ["/tmp/dest2"]
+
 # spout definitions
 spouts:
   - id: "spout-1"
@@ -84,6 +90,8 @@ bolts:
         args: [ref: "rotationPolicy"]
       - name: "withSyncPolicy"
         args: [ref: "syncPolicy"]
+      - name: "addRotationAction"
+        args: [ref: "rotationAction"]
     parallelism: 1
     # ...
 

Reply via email to