merge flux into external/flux/
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b21a98dd Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b21a98dd Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b21a98dd Branch: refs/heads/master Commit: b21a98dd87f82a06a6295ab2bfd832c2810ca57e Parents: ea0fe12 b372a11 Author: P. Taylor Goetz <ptgo...@gmail.com> Authored: Wed May 6 13:31:04 2015 -0400 Committer: P. Taylor Goetz <ptgo...@gmail.com> Committed: Wed May 6 13:31:04 2015 -0400 ---------------------------------------------------------------------- external/flux/.gitignore | 15 + external/flux/LICENSE | 202 +++++ external/flux/README.md | 845 +++++++++++++++++++ external/flux/flux-core/pom.xml | 94 +++ .../main/java/org/apache/storm/flux/Flux.java | 263 ++++++ .../java/org/apache/storm/flux/FluxBuilder.java | 591 +++++++++++++ .../apache/storm/flux/api/TopologySource.java | 39 + .../org/apache/storm/flux/model/BeanDef.java | 39 + .../apache/storm/flux/model/BeanReference.java | 39 + .../org/apache/storm/flux/model/BoltDef.java | 24 + .../storm/flux/model/ConfigMethodDef.java | 62 ++ .../storm/flux/model/ExecutionContext.java | 77 ++ .../apache/storm/flux/model/GroupingDef.java | 77 ++ .../org/apache/storm/flux/model/IncludeDef.java | 54 ++ .../org/apache/storm/flux/model/ObjectDef.java | 90 ++ .../apache/storm/flux/model/PropertyDef.java | 58 ++ .../org/apache/storm/flux/model/SpoutDef.java | 24 + .../org/apache/storm/flux/model/StreamDef.java | 64 ++ .../apache/storm/flux/model/TopologyDef.java | 216 +++++ .../storm/flux/model/TopologySourceDef.java | 36 + .../org/apache/storm/flux/model/VertexDef.java | 36 + .../apache/storm/flux/parser/FluxParser.java | 202 +++++ .../flux-core/src/main/resources/splash.txt | 9 + .../org/apache/storm/flux/FluxBuilderTest.java | 31 + .../org/apache/storm/flux/IntegrationTest.java | 41 + .../java/org/apache/storm/flux/TCKTest.java | 234 +++++ .../multilang/MultilangEnvirontmentTest.java | 89 ++ .../apache/storm/flux/test/SimpleTopology.java | 42 + .../storm/flux/test/SimpleTopologySource.java | 35 + .../test/SimpleTopologyWithConfigParam.java | 38 + .../org/apache/storm/flux/test/TestBolt.java | 63 ++ .../storm/flux/test/TridentTopologySource.java | 54 ++ .../src/test/resources/configs/bad_hbase.yaml | 98 +++ .../resources/configs/config-methods-test.yaml | 70 ++ .../existing-topology-method-override.yaml | 10 + .../existing-topology-reflection-config.yaml | 9 + .../configs/existing-topology-reflection.yaml | 9 + .../configs/existing-topology-trident.yaml | 9 + .../resources/configs/existing-topology.yaml | 8 + .../src/test/resources/configs/hdfs_test.yaml | 97 +++ .../test/resources/configs/include_test.yaml | 25 + .../configs/invalid-existing-topology.yaml | 17 + .../src/test/resources/configs/kafka_test.yaml | 126 +++ .../src/test/resources/configs/shell_test.yaml | 104 +++ .../test/resources/configs/simple_hbase.yaml | 120 +++ .../resources/configs/substitution-test.yaml | 106 +++ .../src/test/resources/configs/tck.yaml | 95 +++ .../src/test/resources/configs/test.properties | 2 + .../flux-core/src/test/resources/logback.xml | 30 + external/flux/flux-examples/README.md | 68 ++ external/flux/flux-examples/pom.xml | 87 ++ .../storm/flux/examples/WordCountClient.java | 74 ++ .../apache/storm/flux/examples/WordCounter.java | 71 ++ .../src/main/resources/hbase_bolt.properties | 18 + .../src/main/resources/hdfs_bolt.properties | 26 + .../src/main/resources/kafka_spout.yaml | 136 +++ .../src/main/resources/multilang.yaml | 89 ++ .../src/main/resources/simple_hbase.yaml | 92 ++ .../src/main/resources/simple_hdfs.yaml | 105 +++ .../src/main/resources/simple_wordcount.yaml | 68 ++ external/flux/flux-ui/README.md | 3 + external/flux/flux-wrappers/pom.xml | 35 + .../flux/wrappers/bolts/FluxShellBolt.java | 56 ++ .../storm/flux/wrappers/bolts/LogInfoBolt.java | 44 + .../flux/wrappers/spouts/FluxShellSpout.java | 55 ++ .../main/resources/resources/randomsentence.js | 93 ++ .../main/resources/resources/splitsentence.py | 24 + .../src/main/resources/resources/storm.js | 373 ++++++++ .../src/main/resources/resources/storm.py | 260 ++++++ external/flux/pom.xml | 126 +++ 70 files changed, 6621 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/.gitignore ---------------------------------------------------------------------- diff --cc external/flux/.gitignore index 0000000,0000000..35fb1db new file mode 100644 --- /dev/null +++ b/external/flux/.gitignore @@@ -1,0 -1,0 +1,15 @@@ ++*.class ++**/target ++ ++# Package Files # ++*.jar ++*.war ++*.ear ++ ++# Intellij ++**/*.iml ++**/*.ipr ++**/*.iws ++ ++# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml ++hs_err_pid* http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/LICENSE ---------------------------------------------------------------------- diff --cc external/flux/LICENSE index 0000000,0000000..e06d208 new file mode 100644 --- /dev/null +++ b/external/flux/LICENSE @@@ -1,0 -1,0 +1,202 @@@ ++Apache License ++ Version 2.0, January 2004 ++ http://www.apache.org/licenses/ ++ ++ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION ++ ++ 1. Definitions. ++ ++ "License" shall mean the terms and conditions for use, reproduction, ++ and distribution as defined by Sections 1 through 9 of this document. ++ ++ "Licensor" shall mean the copyright owner or entity authorized by ++ the copyright owner that is granting the License. ++ ++ "Legal Entity" shall mean the union of the acting entity and all ++ other entities that control, are controlled by, or are under common ++ control with that entity. For the purposes of this definition, ++ "control" means (i) the power, direct or indirect, to cause the ++ direction or management of such entity, whether by contract or ++ otherwise, or (ii) ownership of fifty percent (50%) or more of the ++ outstanding shares, or (iii) beneficial ownership of such entity. ++ ++ "You" (or "Your") shall mean an individual or Legal Entity ++ exercising permissions granted by this License. ++ ++ "Source" form shall mean the preferred form for making modifications, ++ including but not limited to software source code, documentation ++ source, and configuration files. ++ ++ "Object" form shall mean any form resulting from mechanical ++ transformation or translation of a Source form, including but ++ not limited to compiled object code, generated documentation, ++ and conversions to other media types. ++ ++ "Work" shall mean the work of authorship, whether in Source or ++ Object form, made available under the License, as indicated by a ++ copyright notice that is included in or attached to the work ++ (an example is provided in the Appendix below). ++ ++ "Derivative Works" shall mean any work, whether in Source or Object ++ form, that is based on (or derived from) the Work and for which the ++ editorial revisions, annotations, elaborations, or other modifications ++ represent, as a whole, an original work of authorship. For the purposes ++ of this License, Derivative Works shall not include works that remain ++ separable from, or merely link (or bind by name) to the interfaces of, ++ the Work and Derivative Works thereof. ++ ++ "Contribution" shall mean any work of authorship, including ++ the original version of the Work and any modifications or additions ++ to that Work or Derivative Works thereof, that is intentionally ++ submitted to Licensor for inclusion in the Work by the copyright owner ++ or by an individual or Legal Entity authorized to submit on behalf of ++ the copyright owner. For the purposes of this definition, "submitted" ++ means any form of electronic, verbal, or written communication sent ++ to the Licensor or its representatives, including but not limited to ++ communication on electronic mailing lists, source code control systems, ++ and issue tracking systems that are managed by, or on behalf of, the ++ Licensor for the purpose of discussing and improving the Work, but ++ excluding communication that is conspicuously marked or otherwise ++ designated in writing by the copyright owner as "Not a Contribution." ++ ++ "Contributor" shall mean Licensor and any individual or Legal Entity ++ on behalf of whom a Contribution has been received by Licensor and ++ subsequently incorporated within the Work. ++ ++ 2. Grant of Copyright License. Subject to the terms and conditions of ++ this License, each Contributor hereby grants to You a perpetual, ++ worldwide, non-exclusive, no-charge, royalty-free, irrevocable ++ copyright license to reproduce, prepare Derivative Works of, ++ publicly display, publicly perform, sublicense, and distribute the ++ Work and such Derivative Works in Source or Object form. ++ ++ 3. Grant of Patent License. Subject to the terms and conditions of ++ this License, each Contributor hereby grants to You a perpetual, ++ worldwide, non-exclusive, no-charge, royalty-free, irrevocable ++ (except as stated in this section) patent license to make, have made, ++ use, offer to sell, sell, import, and otherwise transfer the Work, ++ where such license applies only to those patent claims licensable ++ by such Contributor that are necessarily infringed by their ++ Contribution(s) alone or by combination of their Contribution(s) ++ with the Work to which such Contribution(s) was submitted. If You ++ institute patent litigation against any entity (including a ++ cross-claim or counterclaim in a lawsuit) alleging that the Work ++ or a Contribution incorporated within the Work constitutes direct ++ or contributory patent infringement, then any patent licenses ++ granted to You under this License for that Work shall terminate ++ as of the date such litigation is filed. ++ ++ 4. Redistribution. You may reproduce and distribute copies of the ++ Work or Derivative Works thereof in any medium, with or without ++ modifications, and in Source or Object form, provided that You ++ meet the following conditions: ++ ++ (a) You must give any other recipients of the Work or ++ Derivative Works a copy of this License; and ++ ++ (b) You must cause any modified files to carry prominent notices ++ stating that You changed the files; and ++ ++ (c) You must retain, in the Source form of any Derivative Works ++ that You distribute, all copyright, patent, trademark, and ++ attribution notices from the Source form of the Work, ++ excluding those notices that do not pertain to any part of ++ the Derivative Works; and ++ ++ (d) If the Work includes a "NOTICE" text file as part of its ++ distribution, then any Derivative Works that You distribute must ++ include a readable copy of the attribution notices contained ++ within such NOTICE file, excluding those notices that do not ++ pertain to any part of the Derivative Works, in at least one ++ of the following places: within a NOTICE text file distributed ++ as part of the Derivative Works; within the Source form or ++ documentation, if provided along with the Derivative Works; or, ++ within a display generated by the Derivative Works, if and ++ wherever such third-party notices normally appear. The contents ++ of the NOTICE file are for informational purposes only and ++ do not modify the License. You may add Your own attribution ++ notices within Derivative Works that You distribute, alongside ++ or as an addendum to the NOTICE text from the Work, provided ++ that such additional attribution notices cannot be construed ++ as modifying the License. ++ ++ You may add Your own copyright statement to Your modifications and ++ may provide additional or different license terms and conditions ++ for use, reproduction, or distribution of Your modifications, or ++ for any such Derivative Works as a whole, provided Your use, ++ reproduction, and distribution of the Work otherwise complies with ++ the conditions stated in this License. ++ ++ 5. Submission of Contributions. Unless You explicitly state otherwise, ++ any Contribution intentionally submitted for inclusion in the Work ++ by You to the Licensor shall be under the terms and conditions of ++ this License, without any additional terms or conditions. ++ Notwithstanding the above, nothing herein shall supersede or modify ++ the terms of any separate license agreement you may have executed ++ with Licensor regarding such Contributions. ++ ++ 6. Trademarks. This License does not grant permission to use the trade ++ names, trademarks, service marks, or product names of the Licensor, ++ except as required for reasonable and customary use in describing the ++ origin of the Work and reproducing the content of the NOTICE file. ++ ++ 7. Disclaimer of Warranty. Unless required by applicable law or ++ agreed to in writing, Licensor provides the Work (and each ++ Contributor provides its Contributions) on an "AS IS" BASIS, ++ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or ++ implied, including, without limitation, any warranties or conditions ++ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A ++ PARTICULAR PURPOSE. You are solely responsible for determining the ++ appropriateness of using or redistributing the Work and assume any ++ risks associated with Your exercise of permissions under this License. ++ ++ 8. Limitation of Liability. In no event and under no legal theory, ++ whether in tort (including negligence), contract, or otherwise, ++ unless required by applicable law (such as deliberate and grossly ++ negligent acts) or agreed to in writing, shall any Contributor be ++ liable to You for damages, including any direct, indirect, special, ++ incidental, or consequential damages of any character arising as a ++ result of this License or out of the use or inability to use the ++ Work (including but not limited to damages for loss of goodwill, ++ work stoppage, computer failure or malfunction, or any and all ++ other commercial damages or losses), even if such Contributor ++ has been advised of the possibility of such damages. ++ ++ 9. Accepting Warranty or Additional Liability. While redistributing ++ the Work or Derivative Works thereof, You may choose to offer, ++ and charge a fee for, acceptance of support, warranty, indemnity, ++ or other liability obligations and/or rights consistent with this ++ License. However, in accepting such obligations, You may act only ++ on Your own behalf and on Your sole responsibility, not on behalf ++ of any other Contributor, and only if You agree to indemnify, ++ defend, and hold each Contributor harmless for any liability ++ incurred by, or claims asserted against, such Contributor by reason ++ of your accepting any such warranty or additional liability. ++ ++ END OF TERMS AND CONDITIONS ++ ++ APPENDIX: How to apply the Apache License to your work. ++ ++ To apply the Apache License to your work, attach the following ++ boilerplate notice, with the fields enclosed by brackets "{}" ++ replaced with your own identifying information. (Don't include ++ the brackets!) The text should be enclosed in the appropriate ++ comment syntax for the file format. We also recommend that a ++ file or class name and description of purpose be included on the ++ same "printed page" as the copyright notice for easier ++ identification within third-party archives. ++ ++ Copyright {yyyy} {name of copyright owner} ++ ++ Licensed under the Apache License, Version 2.0 (the "License"); ++ you may not use this file except in compliance with the License. ++ You may obtain a copy of the License at ++ ++ http://www.apache.org/licenses/LICENSE-2.0 ++ ++ Unless required by applicable law or agreed to in writing, software ++ distributed under the License is distributed on an "AS IS" BASIS, ++ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ See the License for the specific language governing permissions and ++ limitations under the License. ++ http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/README.md ---------------------------------------------------------------------- diff --cc external/flux/README.md index 0000000,0000000..6f27219 new file mode 100644 --- /dev/null +++ b/external/flux/README.md @@@ -1,0 -1,0 +1,845 @@@ ++# flux ++A framework for creating and deploying Apache Storm streaming computations with less friction. ++ ++## Definition ++**flux** |flÉks| _noun_ ++ ++1. The action or process of flowing or flowing out ++2. Continuous change ++3. In physics, the rate of flow of a fluid, radiant energy, or particles across a given area ++4. A substance mixed with a solid to lower its melting point ++ ++## Rationale ++Bad things happen when configuration is hard-coded. No one should have to recompile or repackage an application in ++order to change configuration. ++ ++## About ++Flux is a framework and set of utilities that make defining and deploying Apache Storm topologies less painful and ++deveoper-intensive. ++ ++Have you ever found yourself repeating this pattern?: ++ ++```java ++ ++public static void main(String[] args) throws Exception { ++ // logic to determine if we're running locally or not... ++ // create necessary config options... ++ boolean runLocal = shouldRunLocal(); ++ if(runLocal){ ++ LocalCluster cluster = new LocalCluster(); ++ cluster.submitTopology(name, conf, topology); ++ } else { ++ StormSubmitter.submitTopology(name, conf, topology); ++ } ++} ++``` ++ ++Wouldn't something like this be easier: ++ ++```bash ++storm jar mytopology.jar org.apache.storm.flux.Flux --local config.yaml ++``` ++ ++or: ++ ++```bash ++storm jar mytopology.jar org.apache.storm.flux.Flux --remote config.yaml ++``` ++ ++Another pain point often mentioned is the fact that the wiring for a Topology graph is often tied up in Java code, ++and that any changes require recompilation and repackaging of the topology jar file. Flux aims to alleviate that ++pain by allowing you to package all your Storm components in a single jar, and use an external text file to define ++the layout and configuration of your topologies. ++ ++## Features ++ ++ * Easily configure and deploy Storm topologies (Both Storm core and Microbatch API) without embedding configuration ++ in your topology code ++ * Support for existing topology code (see below) ++ * Define Storm Core API (Spouts/Bolts) using a flexible YAML DSL ++ * YAML DSL support for most Storm components (storm-kafka, storm-hdfs, storm-hbase, etc.) ++ * Convenient support for multi-lang components ++ * External property substitution/filtering for easily switching between configurations/environments (similar to Maven-style ++ `${variable.name}` substitution) ++ ++## Usage ++ ++To use Flux, add it as a dependency and package all your Storm components in a fat jar, then create a YAML document ++to define your topology (see below for YAML configuration options). ++ ++### Building from Source ++The easiest way to use Flux, is to add it as a Maven dependency in you project as described below. ++ ++If you would like to build Flux from source and run the unit/integration tests, you will need the following installed ++on your system: ++ ++* Python 2.6.x or later ++* Node.js 0.10.x or later ++ ++#### Building with unit tests enabled: ++ ++``` ++mvn clean install ++``` ++ ++#### Building with unit tests disabled: ++If you would like to build Flux without installing Python or Node.js you can simply skip the unit tests: ++ ++``` ++mvn clean install -DskipTests=true ++``` ++ ++Note that if you plan on using Flux to deploy topologies to a remote cluster, you will still need to have Python ++installed since it is required by Apache Storm. ++ ++ ++#### Building with integration tests enabled: ++ ++``` ++mvn clean install -DskipIntegration=false ++``` ++ ++ ++### Packaging with Maven ++To enable Flux for your Storm components, you need to add it as a dependency such that it's included in the Storm ++topology jar. This can be accomplished with the Maven shade plugin (preferred) or the Maven assembly plugin (not ++recommended). ++ ++#### Flux Maven Dependency ++The current version of Flux is available in Maven Central at the following coordinates: ++```xml ++<dependency> ++ <groupId>com.github.ptgoetz</groupId> ++ <artifactId>flux-core</artifactId> ++ <version>0.3.0</version> ++</dependency> ++``` ++ ++#### Creating a Flux-Enabled Topology JAR ++The example below illustrates Flux usage with the Maven shade plugin: ++ ++ ```xml ++<!-- include Flux and user dependencies in the shaded jar --> ++<dependencies> ++ <!-- Flux include --> ++ <dependency> ++ <groupId>com.github.ptgoetz</groupId> ++ <artifactId>flux-core</artifactId> ++ <version>0.3.0</version> ++ </dependency> ++ ++ <!-- add user dependencies here... --> ++ ++</dependencies> ++<!-- create a fat jar that includes all dependencies --> ++<build> ++ <plugins> ++ <plugin> ++ <groupId>org.apache.maven.plugins</groupId> ++ <artifactId>maven-shade-plugin</artifactId> ++ <version>1.4</version> ++ <configuration> ++ <createDependencyReducedPom>true</createDependencyReducedPom> ++ </configuration> ++ <executions> ++ <execution> ++ <phase>package</phase> ++ <goals> ++ <goal>shade</goal> ++ </goals> ++ <configuration> ++ <transformers> ++ <transformer ++ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> ++ <transformer ++ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> ++ <mainClass>org.apache.storm.flux.Flux</mainClass> ++ </transformer> ++ </transformers> ++ </configuration> ++ </execution> ++ </executions> ++ </plugin> ++ </plugins> ++</build> ++ ``` ++ ++### Deploying and Running a Flux Topology ++Once your topology components are packaged with the Flux dependency, you can run different topologies either locally ++or remotely using the `storm jar` command. For example, if your fat jar is named `myTopology-0.1.0-SNAPSHOT.jar` you ++could run it locally with the command: ++ ++ ++```bash ++storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local my_config.yaml ++ ++``` ++ ++### Command line options ++``` ++usage: storm jar <my_topology_uber_jar.jar> org.apache.storm.flux.Flux ++ [options] <topology-config.yaml> ++ -d,--dry-run Do not run or deploy the topology. Just ++ build, validate, and print information about ++ the topology. ++ -e,--env-filter Perform environment variable substitution. ++ Replace keysidentified with `${ENV-[NAME]}` ++ will be replaced with the corresponding ++ `NAME` environment value ++ -f,--filter <file> Perform property substitution. Use the ++ specified file as a source of properties, ++ and replace keys identified with {$[property ++ name]} with the value defined in the ++ properties file. ++ -i,--inactive Deploy the topology, but do not activate it. ++ -l,--local Run the topology in local mode. ++ -n,--no-splash Suppress the printing of the splash screen. ++ -q,--no-detail Suppress the printing of topology details. ++ -r,--remote Deploy the topology to a remote cluster. ++ -R,--resource Treat the supplied path as a classpath ++ resource instead of a file. ++ -s,--sleep <ms> When running locally, the amount of time to ++ sleep (in ms.) before killing the topology ++ and shutting down the local cluster. ++ -z,--zookeeper <host:port> When running in local mode, use the ++ ZooKeeper at the specified <host>:<port> ++ instead of the in-process ZooKeeper. ++ (requires Storm 0.9.3 or later) ++``` ++ ++**NOTE:** Flux tries to avoid command line switch collision with the `storm` command, and allows any other command line ++switches to pass through to the `storm` command. ++ ++For example, you can use the `storm` command switch `-c` to override a topology configuration property. The following ++example command will run Flux and override the `nimus.host` configuration: ++ ++```bash ++storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote my_config.yaml -c nimbus.host=localhost ++``` ++ ++### Sample output ++``` ++âââââââââââ âââ ââââââ âââ ++âââââââââââ âââ âââââââââââ ++ââââââ âââ âââ âââ ââââââ ++ââââââ âââ âââ âââ ââââââ ++âââ âââââââââââââââââââââ âââ ++âââ ââââââââ âââââââ âââ âââ +++- Apache Storm -+ +++- data FLow User eXperience -+ ++Version: 0.3.0 ++Parsing file: /Users/hsimpson/Projects/donut_domination/storm/shell_test.yaml ++---------- TOPOLOGY DETAILS ---------- ++Name: shell-topology ++--------------- SPOUTS --------------- ++sentence-spout[1](org.apache.storm.flux.spouts.GenericShellSpout) ++---------------- BOLTS --------------- ++splitsentence[1](org.apache.storm.flux.bolts.GenericShellBolt) ++log[1](org.apache.storm.flux.wrappers.bolts.LogInfoBolt) ++count[1](backtype.storm.testing.TestWordCounter) ++--------------- STREAMS --------------- ++sentence-spout --SHUFFLE--> splitsentence ++splitsentence --FIELDS--> count ++count --SHUFFLE--> log ++-------------------------------------- ++Submitting topology: 'shell-topology' to remote cluster... ++``` ++ ++## YAML Configuration ++Flux topologies are defined in a YAML file that describes a topology. A Flux topology ++definition consists of the following: ++ ++ 1. A topology name ++ 2. A list of topology "components" (named Java objects that will be made available in the environment) ++ 3. **EITHER** (A DSL topology definition): ++ * A list of spouts, each identified by a unique ID ++ * A list of bolts, each identified by a unique ID ++ * A list of "stream" objects representing a flow of tuples between spouts and bolts ++ 4. **OR** (A JVM class that can produce a `backtype.storm.generated.StormTopology` instance: ++ * A `topologySource` definition. ++ ++ ++ ++For example, here is a simple definition of a wordcount topology using the YAML DSL: ++ ++```yaml ++name: "yaml-topology" ++config: ++ topology.workers: 1 ++ ++# spout definitions ++spouts: ++ - id: "spout-1" ++ className: "backtype.storm.testing.TestWordSpout" ++ parallelism: 1 ++ ++# bolt definitions ++bolts: ++ - id: "bolt-1" ++ className: "backtype.storm.testing.TestWordCounter" ++ parallelism: 1 ++ - id: "bolt-2" ++ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" ++ parallelism: 1 ++ ++#stream definitions ++streams: ++ - name: "spout-1 --> bolt-1" # name isn't used (placeholder for logging, UI, etc.) ++ from: "spout-1" ++ to: "bolt-1" ++ grouping: ++ type: FIELDS ++ args: ["word"] ++ ++ - name: "bolt-1 --> bolt2" ++ from: "bolt-1" ++ to: "bolt-2" ++ grouping: ++ type: SHUFFLE ++ ++ ++``` ++## Property Substitution/Filtering ++It's common for developers to want to easily switch between configurations, for example switching deployment between ++a development environment and a production environment. This can be accomplished by using separate YAML configuration ++files, but that approach would lead to unnecessary duplication, especially in situations where the Storm topology ++does not change, but configuration settings such as host names, ports, and parallelism paramters do. ++ ++For this case, Flux offers properties filtering to allow you two externalize values to a `.properties` file and have ++them substituted before the `.yaml` file is parsed. ++ ++To enable property filtering, use the `--filter` command line option and specify a `.properties` file. For example, ++if you invoked flux like so: ++ ++```bash ++storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local my_config.yaml --filter dev.properties ++``` ++With the following `dev.properties` file: ++ ++```properties ++kafka.zookeeper.hosts: localhost:2181 ++``` ++ ++You would then be able to reference those properties by key in your `.yaml` file using `${}` syntax: ++ ++```yaml ++ - id: "zkHosts" ++ className: "storm.kafka.ZkHosts" ++ constructorArgs: ++ - "${kafka.zookeeper.hosts}" ++``` ++ ++In this case, Flux would replace `${kafka.zookeeper.hosts}` with `localhost:2181` before parsing the YAML contents. ++ ++### Environment Variable Substitution/Filtering ++Flux also allows environment variable substitution. For example, if an environment variable named `ZK_HOSTS` if defined, ++you can reference it in a Flux YAML file with the following syntax: ++ ++``` ++${ENV-ZK_HOSTS} ++``` ++ ++## Components ++Components are essentially named object instances that are made available as configuration options for spouts and ++bolts. If you are familiar with the Spring framework, components are roughly analagous to Spring beans. ++ ++Every component is identified, at a minimum, by a unique identifier (String) and a class name (String). For example, ++the following will make an instance of the `storm.kafka.StringScheme` class available as a reference under the key ++`"stringScheme"` . This assumes the `storm.kafka.StringScheme` has a default constructor. ++ ++```yaml ++components: ++ - id: "stringScheme" ++ className: "storm.kafka.StringScheme" ++``` ++ ++### Contructor Arguments, References, Properties and Configuration Methods ++ ++####Constructor Arguments ++Arguments to a class constructor can be configured by adding a `contructorArgs` element to a components. ++`constructorArgs` is a list of objects that will be passed to the class' constructor. The following example creates an ++object by calling the constructor that takes a single string as an argument: ++ ++```yaml ++ - id: "zkHosts" ++ className: "storm.kafka.ZkHosts" ++ constructorArgs: ++ - "localhost:2181" ++``` ++ ++####References ++Each component instance is identified by a unique id that allows it to be used/reused by other components. To ++reference an existing component, you specify the id of the component with the `ref` tag. ++ ++In the following example, a component with the id `"stringScheme"` is created, and later referenced, as a an argument ++to another component's constructor: ++ ++```yaml ++components: ++ - id: "stringScheme" ++ className: "storm.kafka.StringScheme" ++ ++ - id: "stringMultiScheme" ++ className: "backtype.storm.spout.SchemeAsMultiScheme" ++ constructorArgs: ++ - ref: "stringScheme" # component with id "stringScheme" must be declared above. ++``` ++**N.B.:** References can only be used after (below) the object they point to has been declared. ++ ++####Properties ++In addition to calling constructors with different arguments, Flux also allows you to configure components using ++JavaBean-like setter methods and fields declared as `public`: ++ ++```yaml ++ - id: "spoutConfig" ++ className: "storm.kafka.SpoutConfig" ++ constructorArgs: ++ # brokerHosts ++ - ref: "zkHosts" ++ # topic ++ - "myKafkaTopic" ++ # zkRoot ++ - "/kafkaSpout" ++ # id ++ - "myId" ++ properties: ++ - name: "forceFromStart" ++ value: true ++ - name: "scheme" ++ ref: "stringMultiScheme" ++``` ++ ++In the example above, the `properties` declaration will cause Flux to look for a public method in the `SpoutConfig` with ++the signature `setForceFromStart(boolean b)` and attempt to invoke it. If a setter method is not found, Flux will then ++look for a public instance variable with the name `forceFromStart` and attempt to set its value. ++ ++References may also be used as property values. ++ ++####Configuration Methods ++Conceptually, configuration methods are similar to Properties and Constructor Args -- they allow you to invoke an ++arbitrary method on an object after it is constructed. Configuration methods are useful for working with classes that ++don't expose JavaBean methods or have constructors that can fully configure the object. Common examples include classes ++that use the builder pattern for configuration/composition. ++ ++The following YAML example creates a bolt and configures it by calling several methods: ++ ++```yaml ++bolts: ++ - id: "bolt-1" ++ className: "org.apache.storm.flux.test.TestBolt" ++ parallelism: 1 ++ configMethods: ++ - name: "withFoo" ++ args: ++ - "foo" ++ - name: "withBar" ++ args: ++ - "bar" ++ - name: "withFooBar" ++ args: ++ - "foo" ++ - "bar" ++``` ++ ++The signatures of the corresponding methods are as follows: ++ ++```java ++ public void withFoo(String foo); ++ public void withBar(String bar); ++ public void withFooBar(String foo, String bar); ++``` ++ ++Arguments passed to configuration methods work much the same way as constructor arguments, and support references as ++well. ++ ++### Using Java `enum`s in Contructor Arguments, References, Properties and Configuration Methods ++You can easily use Java `enum` values as arguments in a Flux YAML file, simply by referencing the name of the `enum`. ++ ++For example, [Storm's HDFS module]() includes the following `enum` definition (simplified for brevity): ++ ++```java ++public static enum Units { ++ KB, MB, GB, TB ++} ++``` ++ ++And the `org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy` class has the following constructor: ++ ++```java ++public FileSizeRotationPolicy(float count, Units units) ++ ++``` ++The following Flux `component` definition could be used to call the constructor: ++ ++```yaml ++ - id: "rotationPolicy" ++ className: "org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy" ++ constructorArgs: ++ - 5.0 ++ - MB ++``` ++ ++The above definition is functionally equivalent to the following Java code: ++ ++```java ++// rotate files when they reach 5MB ++FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB); ++``` ++ ++## Topology Config ++The `config` section is simply a map of Storm topology configuration parameters that will be passed to the ++`backtype.storm.StormSubmitter` as an instance of the `backtype.storm.Config` class: ++ ++```yaml ++config: ++ topology.workers: 4 ++ topology.max.spout.pending: 1000 ++ topology.message.timeout.secs: 30 ++``` ++ ++# Existing Topologies ++If you have existing Storm topologies, you can still use Flux to deploy/run/test them. This feature allows you to ++leverage Flux Constructor Arguments, References, Properties, and Topology Config declarations for existing topology ++classes. ++ ++The easiest way to use an existing topology class is to define ++a `getTopology()` instance method with one of the following signatures: ++ ++```java ++public StormTopology getTopology(Map<String, Object> config) ++``` ++or: ++ ++```java ++public StormTopology getTopology(Config config) ++``` ++ ++You could then use the following YAML to configure your topology: ++ ++```yaml ++name: "existing-topology" ++topologySource: ++ className: "org.apache.storm.flux.test.SimpleTopology" ++``` ++ ++If the class you would like to use as a topology source has a different method name (i.e. not `getTopology`), you can ++override it: ++ ++```yaml ++name: "existing-topology" ++topologySource: ++ className: "org.apache.storm.flux.test.SimpleTopology" ++ methodName: "getTopologyWithDifferentMethodName" ++``` ++ ++__N.B.:__ The specified method must accept a single argument of type `java.util.Map<String, Object>` or ++`backtype.storm.Config`, and return a `backtype.storm.generated.StormTopology` object. ++ ++# YAML DSL ++## Spouts and Bolts ++Spout and Bolts are configured in their own respective section of the YAML configuration. Spout and Bolt definitions ++are extensions to the `component` definition that add a `parallelism` parameter that sets the parallelism for a ++component when the topology is deployed. ++ ++Because spout and bolt definitions extend `component` they support constructor arguments, references, and properties as ++well. ++ ++Shell spout example: ++ ++```yaml ++spouts: ++ - id: "sentence-spout" ++ className: "org.apache.storm.flux.spouts.GenericShellSpout" ++ # shell spout constructor takes 2 arguments: String[], String[] ++ constructorArgs: ++ # command line ++ - ["node", "randomsentence.js"] ++ # output fields ++ - ["word"] ++ parallelism: 1 ++``` ++ ++Kafka spout example: ++ ++```yaml ++components: ++ - id: "stringScheme" ++ className: "storm.kafka.StringScheme" ++ ++ - id: "stringMultiScheme" ++ className: "backtype.storm.spout.SchemeAsMultiScheme" ++ constructorArgs: ++ - ref: "stringScheme" ++ ++ - id: "zkHosts" ++ className: "storm.kafka.ZkHosts" ++ constructorArgs: ++ - "localhost:2181" ++ ++# Alternative kafka config ++# - id: "kafkaConfig" ++# className: "storm.kafka.KafkaConfig" ++# constructorArgs: ++# # brokerHosts ++# - ref: "zkHosts" ++# # topic ++# - "myKafkaTopic" ++# # clientId (optional) ++# - "myKafkaClientId" ++ ++ - id: "spoutConfig" ++ className: "storm.kafka.SpoutConfig" ++ constructorArgs: ++ # brokerHosts ++ - ref: "zkHosts" ++ # topic ++ - "myKafkaTopic" ++ # zkRoot ++ - "/kafkaSpout" ++ # id ++ - "myId" ++ properties: ++ - name: "forceFromStart" ++ value: true ++ - name: "scheme" ++ ref: "stringMultiScheme" ++ ++config: ++ topology.workers: 1 ++ ++# spout definitions ++spouts: ++ - id: "kafka-spout" ++ className: "storm.kafka.KafkaSpout" ++ constructorArgs: ++ - ref: "spoutConfig" ++ ++``` ++ ++Bolt Examples: ++ ++```yaml ++# bolt definitions ++bolts: ++ - id: "splitsentence" ++ className: "org.apache.storm.flux.bolts.GenericShellBolt" ++ constructorArgs: ++ # command line ++ - ["python", "splitsentence.py"] ++ # output fields ++ - ["word"] ++ parallelism: 1 ++ # ... ++ ++ - id: "log" ++ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" ++ parallelism: 1 ++ # ... ++ ++ - id: "count" ++ className: "backtype.storm.testing.TestWordCounter" ++ parallelism: 1 ++ # ... ++``` ++## Streams and Stream Groupings ++Streams in Flux are represented as a list of connections (Graph edges, data flow, etc.) between the Spouts and Bolts in ++a topology, with an associated Grouping definition. ++ ++A Stream definition has the following properties: ++ ++**`name`:** A name for the connection (optional, currently unused) ++ ++**`from`:** The `id` of a Spout or Bolt that is the source (publisher) ++ ++**`to`:** The `id` of a Spout or Bolt that is the destination (subscriber) ++ ++**`grouping`:** The stream grouping definition for the Stream ++ ++A Grouping definition has the following properties: ++ ++**`type`:** The type of grouping. One of `ALL`,`CUSTOM`,`DIRECT`,`SHUFFLE`,`LOCAL_OR_SHUFFLE`,`FIELDS`,`GLOBAL`, or `NONE`. ++ ++**`streamId`:** The Storm stream ID (Optional. If unspecified will use the default stream) ++ ++**`args`:** For the `FIELDS` grouping, a list of field names. ++ ++**`customClass`** For the `CUSTOM` grouping, a definition of custom grouping class instance ++ ++The `streams` definition example below sets up a topology with the following wiring: ++ ++``` ++ kafka-spout --> splitsentence --> count --> log ++``` ++ ++ ++```yaml ++#stream definitions ++# stream definitions define connections between spouts and bolts. ++# note that such connections can be cyclical ++# custom stream groupings are also supported ++ ++streams: ++ - name: "kafka --> split" # name isn't used (placeholder for logging, UI, etc.) ++ from: "kafka-spout" ++ to: "splitsentence" ++ grouping: ++ type: SHUFFLE ++ ++ - name: "split --> count" ++ from: "splitsentence" ++ to: "count" ++ grouping: ++ type: FIELDS ++ args: ["word"] ++ ++ - name: "count --> log" ++ from: "count" ++ to: "log" ++ grouping: ++ type: SHUFFLE ++``` ++ ++### Custom Stream Groupings ++Custom stream groupings are defined by setting the grouping type to `CUSTOM` and defining a `customClass` parameter ++that tells Flux how to instantiate the custom class. The `customClass` definition extends `component`, so it supports ++constructor arguments, references, and properties as well. ++ ++The example below creates a Stream with an instance of the `backtype.storm.testing.NGrouping` custom stream grouping ++class. ++ ++```yaml ++ - name: "bolt-1 --> bolt2" ++ from: "bolt-1" ++ to: "bolt-2" ++ grouping: ++ type: CUSTOM ++ customClass: ++ className: "backtype.storm.testing.NGrouping" ++ constructorArgs: ++ - 1 ++``` ++ ++## Includes and Overrides ++Flux allows you to include the contents of other YAML files, and have them treated as though they were defined in the ++same file. Includes may be either files, or classpath resources. ++ ++Includes are specified as a list of maps: ++ ++```yaml ++includes: ++ - resource: false ++ file: "src/test/resources/configs/shell_test.yaml" ++ override: false ++``` ++ ++If the `resource` property is set to `true`, the include will be loaded as a classpath resource from the value of the ++`file` attribute, otherwise it will be treated as a regular file. ++ ++The `override` property controls how includes affect the values defined in the current file. If `override` is set to ++`true`, values in the included file will replace values in the current file being parsed. If `override` is set to ++`false`, values in the current file being parsed will take precedence, and the parser will refuse to replace them. ++ ++**N.B.:** Includes are not yet recursive. Includes from included files will be ignored. ++ ++ ++## Basic Word Count Example ++ ++This example uses a spout implemented in JavaScript, a bolt implemented in Python, and a bolt implemented in Java ++ ++Topology YAML config: ++ ++```yaml ++--- ++name: "shell-topology" ++config: ++ topology.workers: 1 ++ ++# spout definitions ++spouts: ++ - id: "sentence-spout" ++ className: "org.apache.storm.flux.spouts.GenericShellSpout" ++ # shell spout constructor takes 2 arguments: String[], String[] ++ constructorArgs: ++ # command line ++ - ["node", "randomsentence.js"] ++ # output fields ++ - ["word"] ++ parallelism: 1 ++ ++# bolt definitions ++bolts: ++ - id: "splitsentence" ++ className: "org.apache.storm.flux.bolts.GenericShellBolt" ++ constructorArgs: ++ # command line ++ - ["python", "splitsentence.py"] ++ # output fields ++ - ["word"] ++ parallelism: 1 ++ ++ - id: "log" ++ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" ++ parallelism: 1 ++ ++ - id: "count" ++ className: "backtype.storm.testing.TestWordCounter" ++ parallelism: 1 ++ ++#stream definitions ++# stream definitions define connections between spouts and bolts. ++# note that such connections can be cyclical ++# custom stream groupings are also supported ++ ++streams: ++ - name: "spout --> split" # name isn't used (placeholder for logging, UI, etc.) ++ from: "sentence-spout" ++ to: "splitsentence" ++ grouping: ++ type: SHUFFLE ++ ++ - name: "split --> count" ++ from: "splitsentence" ++ to: "count" ++ grouping: ++ type: FIELDS ++ args: ["word"] ++ ++ - name: "count --> log" ++ from: "count" ++ to: "log" ++ grouping: ++ type: SHUFFLE ++``` ++ ++ ++## Micro-Batching (Trident) API Support ++Currenty, the Flux YAML DSL only supports the Core Storm API, but support for Storm's micro-batching API is planned. ++ ++To use Flux with a Trident topology, define a topology getter method and reference it in your YAML config: ++ ++```yaml ++name: "my-trident-topology" ++ ++config: ++ topology.workers: 1 ++ ++topologySource: ++ className: "org.apache.storm.flux.test.TridentTopologySource" ++ # Flux will look for "getTopology", this will override that. ++ methodName: "getTopologyWithDifferentMethodName" ++``` ++ ++## Author ++P. Taylor Goetz ++ ++## Contributors ++ ++ ++## Contributing ++ ++Contributions in any form are more than welcome. ++ ++The intent of this project is that it will be donated to Apache Storm. ++ ++By offering any contributions to this project, you should be willing and able to submit an ++[Apache ICLA](http://www.apache.org/licenses/icla.txt), if you have not done so already. http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/pom.xml ---------------------------------------------------------------------- diff --cc external/flux/flux-core/pom.xml index 0000000,0000000..600613d new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/pom.xml @@@ -1,0 -1,0 +1,94 @@@ ++<?xml version="1.0" encoding="UTF-8"?> ++<!-- ++ Licensed to the Apache Software Foundation (ASF) under one or more ++ contributor license agreements. See the NOTICE file distributed with ++ this work for additional information regarding copyright ownership. ++ The ASF licenses this file to You under the Apache License, Version 2.0 ++ (the "License"); you may not use this file except in compliance with ++ the License. You may obtain a copy of the License at ++ ++ http://www.apache.org/licenses/LICENSE-2.0 ++ ++ Unless required by applicable law or agreed to in writing, software ++ distributed under the License is distributed on an "AS IS" BASIS, ++ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ See the License for the specific language governing permissions and ++ limitations under the License. ++--> ++<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> ++ <modelVersion>4.0.0</modelVersion> ++ ++ <parent> ++ <groupId>com.github.ptgoetz</groupId> ++ <artifactId>flux</artifactId> ++ <version>0.3.1-SNAPSHOT</version> ++ <relativePath>../pom.xml</relativePath> ++ </parent> ++ ++ <groupId>com.github.ptgoetz</groupId> ++ <artifactId>flux-core</artifactId> ++ <packaging>jar</packaging> ++ ++ <name>flux-core</name> ++ <url>https://github.com/ptgoetz/flux</url> ++ ++ <dependencies> ++ <dependency> ++ <groupId>com.github.ptgoetz</groupId> ++ <artifactId>flux-wrappers</artifactId> ++ <version>${project.version}</version> ++ </dependency> ++ <dependency> ++ <groupId>org.apache.storm</groupId> ++ <artifactId>storm-kafka</artifactId> ++ <version>${storm.version}</version> ++ <scope>test</scope> ++ </dependency> ++ <dependency> ++ <groupId>org.apache.storm</groupId> ++ <artifactId>storm-hdfs</artifactId> ++ <version>${storm.version}</version> ++ <scope>test</scope> ++ </dependency> ++ <dependency> ++ <groupId>org.apache.storm</groupId> ++ <artifactId>storm-hbase</artifactId> ++ <version>${storm.version}</version> ++ <scope>test</scope> ++ </dependency> ++ </dependencies> ++ <build> ++ <resources> ++ <resource> ++ <directory>src/main/resources</directory> ++ <filtering>true</filtering> ++ </resource> ++ </resources> ++ <plugins> ++ <plugin> ++ <groupId>org.apache.maven.plugins</groupId> ++ <artifactId>maven-shade-plugin</artifactId> ++ <version>1.4</version> ++ <configuration> ++ <createDependencyReducedPom>true</createDependencyReducedPom> ++ </configuration> ++ <executions> ++ <execution> ++ <phase>package</phase> ++ <goals> ++ <goal>shade</goal> ++ </goals> ++ <configuration> ++ <transformers> ++ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> ++ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> ++ <mainClass>org.apache.storm.flux.Flux</mainClass> ++ </transformer> ++ </transformers> ++ </configuration> ++ </execution> ++ </executions> ++ </plugin> ++ </plugins> ++ </build> ++</project> http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java index 0000000,0000000..6300631 new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java @@@ -1,0 -1,0 +1,263 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.storm.flux; ++ ++import backtype.storm.Config; ++import backtype.storm.LocalCluster; ++import backtype.storm.StormSubmitter; ++import backtype.storm.generated.StormTopology; ++import backtype.storm.generated.SubmitOptions; ++import backtype.storm.generated.TopologyInitialStatus; ++import backtype.storm.utils.Utils; ++import org.apache.commons.cli.*; ++import org.apache.storm.flux.model.*; ++import org.apache.storm.flux.parser.FluxParser; ++import org.slf4j.Logger; ++import org.slf4j.LoggerFactory; ++ ++import java.io.*; ++import java.util.Map; ++import java.util.Properties; ++ ++/** ++ * Flux entry point. ++ * ++ */ ++public class Flux { ++ private static final Logger LOG = LoggerFactory.getLogger(Flux.class); ++ ++ private static final Long DEFAULT_LOCAL_SLEEP_TIME = 60000l; ++ ++ private static final Long DEFAULT_ZK_PORT = 2181l; ++ ++ private static final String OPTION_LOCAL = "local"; ++ private static final String OPTION_REMOTE = "remote"; ++ private static final String OPTION_RESOURCE = "resource"; ++ private static final String OPTION_SLEEP = "sleep"; ++ private static final String OPTION_DRY_RUN = "dry-run"; ++ private static final String OPTION_NO_DETAIL = "no-detail"; ++ private static final String OPTION_NO_SPLASH = "no-splash"; ++ private static final String OPTION_INACTIVE = "inactive"; ++ private static final String OPTION_ZOOKEEPER = "zookeeper"; ++ private static final String OPTION_FILTER = "filter"; ++ private static final String OPTION_ENV_FILTER = "env-filter"; ++ ++ public static void main(String[] args) throws Exception { ++ Options options = new Options(); ++ ++ options.addOption(option(0, "l", OPTION_LOCAL, "Run the topology in local mode.")); ++ ++ options.addOption(option(0, "r", OPTION_REMOTE, "Deploy the topology to a remote cluster.")); ++ ++ options.addOption(option(0, "R", OPTION_RESOURCE, "Treat the supplied path as a classpath resource instead of a file.")); ++ ++ options.addOption(option(1, "s", OPTION_SLEEP, "ms", "When running locally, the amount of time to sleep (in ms.) " + ++ "before killing the topology and shutting down the local cluster.")); ++ ++ options.addOption(option(0, "d", OPTION_DRY_RUN, "Do not run or deploy the topology. Just build, validate, " + ++ "and print information about the topology.")); ++ ++ options.addOption(option(0, "q", OPTION_NO_DETAIL, "Suppress the printing of topology details.")); ++ ++ options.addOption(option(0, "n", OPTION_NO_SPLASH, "Suppress the printing of the splash screen.")); ++ ++ options.addOption(option(0, "i", OPTION_INACTIVE, "Deploy the topology, but do not activate it.")); ++ ++ options.addOption(option(1, "z", OPTION_ZOOKEEPER, "host:port", "When running in local mode, use the ZooKeeper at the " + ++ "specified <host>:<port> instead of the in-process ZooKeeper. (requires Storm 0.9.3 or later)")); ++ ++ options.addOption(option(1, "f", OPTION_FILTER, "file", "Perform property substitution. Use the specified file " + ++ "as a source of properties, and replace keys identified with {$[property name]} with the value defined " + ++ "in the properties file.")); ++ ++ options.addOption(option(0, "e", OPTION_ENV_FILTER, "Perform environment variable substitution. Replace keys" + ++ "identified with `${ENV-[NAME]}` will be replaced with the corresponding `NAME` environment value")); ++ ++ CommandLineParser parser = new BasicParser(); ++ CommandLine cmd = parser.parse(options, args); ++ ++ if (cmd.getArgs().length != 1) { ++ usage(options); ++ System.exit(1); ++ } ++ runCli(cmd); ++ } ++ ++ private static Option option(int argCount, String shortName, String longName, String description){ ++ return option(argCount, shortName, longName, longName, description); ++ } ++ ++ private static Option option(int argCount, String shortName, String longName, String argName, String description){ ++ Option option = OptionBuilder.hasArgs(argCount) ++ .withArgName(argName) ++ .withLongOpt(longName) ++ .withDescription(description) ++ .create(shortName); ++ return option; ++ } ++ ++ private static void usage(Options options) { ++ HelpFormatter formatter = new HelpFormatter(); ++ formatter.printHelp("storm jar <my_topology_uber_jar.jar> " + ++ Flux.class.getName() + ++ " [options] <topology-config.yaml>", options); ++ } ++ ++ private static void runCli(CommandLine cmd)throws Exception { ++ if(!cmd.hasOption(OPTION_NO_SPLASH)) { ++ printSplash(); ++ } ++ ++ boolean dumpYaml = cmd.hasOption("dump-yaml"); ++ ++ TopologyDef topologyDef = null; ++ String filePath = (String)cmd.getArgList().get(0); ++ ++ // TODO conditionally load properties from a file our resource ++ String filterProps = null; ++ if(cmd.hasOption(OPTION_FILTER)){ ++ filterProps = cmd.getOptionValue(OPTION_FILTER); ++ } ++ ++ ++ boolean envFilter = cmd.hasOption(OPTION_ENV_FILTER); ++ if(cmd.hasOption(OPTION_RESOURCE)){ ++ printf("Parsing classpath resource: %s", filePath); ++ topologyDef = FluxParser.parseResource(filePath, dumpYaml, true, filterProps, envFilter); ++ } else { ++ printf("Parsing file: %s", ++ new File(filePath).getAbsolutePath()); ++ topologyDef = FluxParser.parseFile(filePath, dumpYaml, true, filterProps, envFilter); ++ } ++ ++ ++ String topologyName = topologyDef.getName(); ++ // merge contents of `config` into topology config ++ Config conf = FluxBuilder.buildConfig(topologyDef); ++ ExecutionContext context = new ExecutionContext(topologyDef, conf); ++ StormTopology topology = FluxBuilder.buildTopology(context); ++ ++ if(!cmd.hasOption(OPTION_NO_DETAIL)){ ++ printTopologyInfo(context); ++ } ++ ++ if(!cmd.hasOption(OPTION_DRY_RUN)) { ++ if (cmd.hasOption(OPTION_REMOTE)) { ++ LOG.info("Running remotely..."); ++ try { ++ // should the topology be active or inactive ++ SubmitOptions submitOptions = null; ++ if(cmd.hasOption(OPTION_INACTIVE)){ ++ LOG.info("Deploying topology in an INACTIVE state..."); ++ submitOptions = new SubmitOptions(TopologyInitialStatus.INACTIVE); ++ } else { ++ LOG.info("Deploying topology in an ACTIVE state..."); ++ submitOptions = new SubmitOptions(TopologyInitialStatus.ACTIVE); ++ } ++ StormSubmitter.submitTopology(topologyName, conf, topology, submitOptions, null); ++ } catch (Exception e) { ++ LOG.warn("Unable to deploy topology to remote cluster.", e); ++ } ++ } else { ++ LOG.info("Running in local mode..."); ++ ++ String sleepStr = cmd.getOptionValue(OPTION_SLEEP); ++ Long sleepTime = DEFAULT_LOCAL_SLEEP_TIME; ++ if (sleepStr != null) { ++ sleepTime = Long.parseLong(sleepStr); ++ } ++ LOG.debug("Sleep time: {}", sleepTime); ++ LocalCluster cluster = null; ++ ++ // in-process or external zookeeper ++ if(cmd.hasOption(OPTION_ZOOKEEPER)){ ++ String zkStr = cmd.getOptionValue(OPTION_ZOOKEEPER); ++ LOG.info("Using ZooKeeper at '{}' instead of in-process one.", zkStr); ++ long zkPort = DEFAULT_ZK_PORT; ++ String zkHost = null; ++ if(zkStr.contains(":")){ ++ String[] hostPort = zkStr.split(":"); ++ zkHost = hostPort[0]; ++ zkPort = hostPort.length > 1 ? Long.parseLong(hostPort[1]) : DEFAULT_ZK_PORT; ++ ++ } else { ++ zkHost = zkStr; ++ } ++ // the following constructor is only available in 0.9.3 and later ++ try { ++ cluster = new LocalCluster(zkHost, zkPort); ++ } catch (NoSuchMethodError e){ ++ LOG.error("The --zookeeper option can only be used with Apache Storm 0.9.3 and later."); ++ System.exit(1); ++ } ++ } else { ++ cluster = new LocalCluster(); ++ } ++ cluster.submitTopology(topologyName, conf, topology); ++ ++ Utils.sleep(sleepTime); ++ cluster.killTopology(topologyName); ++ cluster.shutdown(); ++ } ++ } ++ } ++ ++ static void printTopologyInfo(ExecutionContext ctx){ ++ TopologyDef t = ctx.getTopologyDef(); ++ if(t.isDslTopology()) { ++ print("---------- TOPOLOGY DETAILS ----------"); ++ ++ printf("Topology Name: %s", t.getName()); ++ print("--------------- SPOUTS ---------------"); ++ for (SpoutDef s : t.getSpouts()) { ++ printf("%s [%d] (%s)", s.getId(), s.getParallelism(), s.getClassName()); ++ } ++ print("---------------- BOLTS ---------------"); ++ for (BoltDef b : t.getBolts()) { ++ printf("%s [%d] (%s)", b.getId(), b.getParallelism(), b.getClassName()); ++ } ++ ++ print("--------------- STREAMS ---------------"); ++ for (StreamDef sd : t.getStreams()) { ++ printf("%s --%s--> %s", sd.getFrom(), sd.getGrouping().getType(), sd.getTo()); ++ } ++ print("--------------------------------------"); ++ } ++ } ++ ++ // save a little typing ++ private static void printf(String format, Object... args){ ++ print(String.format(format, args)); ++ } ++ ++ private static void print(String string){ ++ System.out.println(string); ++ } ++ ++ private static void printSplash() throws IOException { ++ // banner ++ InputStream is = Flux.class.getResourceAsStream("/splash.txt"); ++ if(is != null){ ++ BufferedReader br = new BufferedReader(new InputStreamReader(is)); ++ String line = null; ++ while((line = br.readLine()) != null){ ++ System.out.println(line); ++ } ++ } ++ } ++}