STORM-2090: Add integration test for storm windowing
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b779ca4e Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b779ca4e Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b779ca4e Branch: refs/heads/master Commit: b779ca4e270f6f2a86d4d30336004e4a642ec690 Parents: fe413ab Author: Raghav Kumar Gautam <rag...@apache.org> Authored: Mon Sep 19 10:26:56 2016 -0700 Committer: Jungtaek Lim <kabh...@gmail.com> Committed: Fri Sep 23 10:32:28 2016 +0900 ---------------------------------------------------------------------- .gitignore | 3 + .travis.yml | 8 +- integration-test/README.md | 59 +++ integration-test/config/Vagrantfile | 183 +++++++++ integration-test/config/cluster.xml | 96 +++++ integration-test/config/common.sh | 21 ++ integration-test/config/etc-hosts | 18 + integration-test/config/install-storm.sh | 39 ++ integration-test/config/install-zookeeper.sh | 20 + integration-test/config/storm.yaml | 33 ++ integration-test/pom.xml | 250 ++++++++++++ integration-test/run-it.sh | 83 ++++ .../org/apache/storm/ExclamationTopology.java | 93 +++++ .../org/apache/storm/debug/DebugHelper.java | 39 ++ .../storm/st/topology/TestableTopology.java | 30 ++ .../topology/window/SlidingTimeCorrectness.java | 170 +++++++++ .../window/SlidingWindowCorrectness.java | 157 ++++++++ .../window/TumblingTimeCorrectness.java | 167 ++++++++ .../window/TumblingWindowCorrectness.java | 154 ++++++++ .../storm/st/topology/window/data/FromJson.java | 22 ++ .../storm/st/topology/window/data/TimeData.java | 110 ++++++ .../st/topology/window/data/TimeDataWindow.java | 91 +++++ .../apache/storm/st/utils/StringDecorator.java | 37 ++ .../org/apache/storm/st/utils/TimeUtil.java | 54 +++ .../test/java/org/apache/storm/st/DemoTest.java | 85 +++++ .../apache/storm/st/helper/AbstractTest.java | 27 ++ .../apache/storm/st/meta/TestngListener.java | 97 +++++ .../st/tests/window/SlidingWindowTest.java | 187 +++++++++ .../st/tests/window/TumblingWindowTest.java | 99 +++++ .../org/apache/storm/st/utils/AssertUtil.java | 71 ++++ .../org/apache/storm/st/wrapper/LogData.java | 66 ++++ .../apache/storm/st/wrapper/StormCluster.java | 118 ++++++ .../org/apache/storm/st/wrapper/TopoWrap.java | 378 +++++++++++++++++++ .../src/test/resources/storm-conf/storm.yaml | 33 ++ pom.xml | 3 + 35 files changed, 3099 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 5c66aba..636570f 100644 --- a/.gitignore +++ b/.gitignore @@ -54,3 +54,6 @@ logs # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml hs_err_pid* + +# ignore vagrant files +/integration-test/config/.vagrant/ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index 941e09c..3aca6a8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,6 +9,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +addons: + hosts: + - node1 env: - MODULES=storm-core @@ -22,8 +25,9 @@ before_install: - nvm install 0.12.2 - nvm use 0.12.2 install: /bin/bash ./dev-tools/travis/travis-install.sh `pwd` -script: /bin/bash ./dev-tools/travis/travis-script.sh `pwd` $MODULES -sudo: false +script: + - /bin/bash ./dev-tools/travis/travis-script.sh `pwd` $MODULES && /bin/bash ./integration-test/run-it.sh +sudo: true cache: directories: - "$HOME/.m2/repository" http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/README.md ---------------------------------------------------------------------- diff --git a/integration-test/README.md b/integration-test/README.md new file mode 100644 index 0000000..36c70a1 --- /dev/null +++ b/integration-test/README.md @@ -0,0 +1,59 @@ +End to end storm integration tests +================================== + +Running tests end-to-end +------------------------ +Assumption: +A single version of storm binary zip such as `storm-dist/binary/target/apache-storm-2.0.0-SNAPSHOT.zip` is present +The following command will bring up a vagrant cluster. +```sh +cd integration-test/config +vagrant up +``` +This automatically will run `integration-test/run-it.sh`. +This brings up a vagrant machine, with storm and zookeeper daemons. +And runs all the tests against it. + +Running tests for development & debugging +========================================= +```vagrant up``` command is steup as a complete auto-pilot. +Following describes how we can run individual tests against this vagrant cluster or any other cluster. + +Configs for running +------------------- +The supplied configuration will run tests against vagrant setup. However, it can be changed to use a different cluster. +Change `integration-test/src/test/resources/storm.yaml` as necessary. + +Running all tests manually +-------------------------- +To run all tests: +```sh +mvn clean package -DskipTests && mvn test +``` + +To run a single test: +```sh +mvn clean package -DskipTests && mvn test -Dtest=SlidingWindowCountTest +``` + +Running tests from IDE +---------------------- +You might have to enable intellij profile to make your IDE happy. +Make sure that the following is run before tests are launched. +```sh +mvn package -DskipTests +``` + +Running tests with custom storm version +--------------------------------------- +You can supply custom storm version using `-Dstorm.version=<storm-version>` property to all the maven commands. +```sh +mvn clean package -DskipTests -Dstorm.version=<storm-version> +mvn test -Dtest=DemoTest -Dstorm.version=<storm-version> +``` + +To find version of the storm that you are running run `storm version` command. + +Code +---- +Start off by looking at file [DemoTest.java](https://github.com/apache/storm/integration-test/blob/master/src/test/java/org/apache/storm/st/DemoTest.java). http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/config/Vagrantfile ---------------------------------------------------------------------- diff --git a/integration-test/config/Vagrantfile b/integration-test/config/Vagrantfile new file mode 100644 index 0000000..7898d9b --- /dev/null +++ b/integration-test/config/Vagrantfile @@ -0,0 +1,183 @@ +# -*- mode: ruby; compile-command: "vagrant destroy -f; vagrant up" -*- +# vi: set ft=ruby : +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +require 'uri' +# Vagrantfile API/syntax version. Don't touch unless you know what you're doing! +VAGRANTFILE_API_VERSION = "2" +STORM_BOX_TYPE = "hashicorp/precise64" +STORM_ZIP = Dir.glob("../../storm-dist/binary/target/**/*.zip") +if(STORM_ZIP.length != 1) + raise "expected one storm-binary found: " + STORM_ZIP.join(",") +end +STORM_ARCHIVE = STORM_ZIP[0] +STORM_VERSION = File.basename(STORM_ARCHIVE, '.*') +STORM_SUPERVISOR_COUNT = 2 + +Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| + + config.vm.box = STORM_BOX_TYPE + #config.hostmanager.manage_host = true + #config.hostmanager.enabled = true + + if Vagrant.has_plugin?("vagrant-cachier") + # Configure cached packages to be shared between instances of the same base box. + # More info on the "Usage" link above + config.cache.scope = :box + + # OPTIONAL: If you are using VirtualBox, you might want to use that to enable + # NFS for shared folders. This is also very useful for vagrant-libvirt if you + # want bi-directional sync + config.cache.synced_folder_opts = { + type: :nfs, + # The nolock option can be useful for an NFSv3 client that wants to avoid the + # NLM sideband protocol. Without this option, apt-get might hang if it tries + # to lock files needed for /var/cache/* operations. All of this can be avoided + # by using NFSv4 everywhere. Please note that the tcp option is not the default. + mount_options: ['rw', 'vers=3', 'tcp', 'nolock'] + } + end + + if(!File.exist?(STORM_ARCHIVE)) + `wget -N #{STORM_DIST_URL}` + end + + config.vm.synced_folder "../../", "/home/vagrant/build/vagrant/storm" + config.vm.synced_folder "~/.m2", "/home/vagrant/.m2" + + config.vm.define "node1" do |node1| + node1.vm.provider "virtualbox" do |v| + v.customize ["modifyvm", :id, "--natdnshostresolver1", "on"] + end + node1.vm.network "private_network", ip: "192.168.50.3" + node1.vm.hostname = "node1" + node1.vm.provision :shell, :inline => "echo run integration test; whoami; env; cd /home/vagrant/build/vagrant/storm/; pwd; bash integration-test/run-it.sh", privileged: false + #node1.vm.provision :shell, :inline => "sudo ln -fs /vagrant/etc-hosts /etc/hosts" + end + + + # All Vagrant configuration is done here. The most common configuration + # options are documented and commented below. For a complete reference, + # please see the online documentation at vagrantup.com. + + # Every Vagrant virtual environment requires a box to build off of. + #config.vm.box = "precise32" + + # The url from where the 'config.vm.box' box will be fetched if it + # doesn't already exist on the user's system. + #config.vm.box_url = "http://files.vagrantup.com/precise32.box" + + # Create a forwarded port mapping which allows access to a specific port + # within the machine from a port on the host machine. In the example below, + # accessing "localhost:8080" will access port 80 on the guest machine. + # config.vm.network :forwarded_port, guest: 8080, host: 8080 + + # Create a private network, which allows host-only access to the machine + # using a specific IP. + config.vm.network :private_network, ip: "192.168.100.100" + + # Create a public network, which generally matched to bridged network. + # Bridged networks make the machine appear as another physical device on + # your network. + # config.vm.network :public_network + + # If true, then any SSH connections made will enable agent forwarding. + # Default value: false + # config.ssh.forward_agent = true + + # Share an additional folder to the guest VM. The first argument is + # the path on the host to the actual folder. The second argument is + # the path on the guest to mount the folder. And the optional third + # argument is a set of non-required options. + # config.vm.synced_folder "../data", "/vagrant_data" + + # Provider-specific configuration so you can fine-tune various + # backing providers for Vagrant. These expose provider-specific options. + # Example for VirtualBox: + # + config.vm.provider :virtualbox do |vb| + # # Don't boot with headless mode + vb.gui = false + # + # # Use VBoxManage to customize the VM. For example to change memory: + vb.customize ["modifyvm", :id, "--memory", "3072"] + end + # + # View the documentation for the provider you're using for more + # information on available options. + + # Enable provisioning with Puppet stand alone. Puppet manifests + # are contained in a directory path relative to this Vagrantfile. + # You will need to create the manifests directory and a manifest in + # the file precise32.pp in the manifests_path directory. + # + # An example Puppet manifest to provision the message of the day: + # + # # group { "puppet": + # # ensure => "present", + # # } + # # + # # File { owner => 0, group => 0, mode => 0644 } + # # + # # file { '/etc/motd': + # # content => "Welcome to your Vagrant-built virtual machine! + # # Managed by Puppet.\n" + # # } + # + # config.vm.provision :puppet do |puppet| + # puppet.manifests_path = "manifests" + # puppet.manifest_file = "site.pp" + # end + + # Enable provisioning with chef solo, specifying a cookbooks path, roles + # path, and data_bags path (all relative to this Vagrantfile), and adding + # some recipes and/or roles. + # + # config.vm.provision :chef_solo do |chef| + # chef.cookbooks_path = "../my-recipes/cookbooks" + # chef.roles_path = "../my-recipes/roles" + # chef.data_bags_path = "../my-recipes/data_bags" + # chef.add_recipe "mysql" + # chef.add_role "web" + # + # # You may also specify custom JSON attributes: + # chef.json = { :mysql_password => "foo" } + # end + + # Enable provisioning with chef server, specifying the chef server URL, + # and the path to the validation key (relative to this Vagrantfile). + # + # The Opscode Platform uses HTTPS. Substitute your organization for + # ORGNAME in the URL and validation key. + # + # If you have your own Chef Server, use the appropriate URL, which may be + # HTTP instead of HTTPS depending on your configuration. Also change the + # validation key to validation.pem. + # + # config.vm.provision :chef_client do |chef| + # chef.chef_server_url = "https://api.opscode.com/organizations/ORGNAME" + # chef.validation_key_path = "ORGNAME-validator.pem" + # end + # + # If you're using the Opscode platform, your validator client is + # ORGNAME-validator, replacing ORGNAME with your organization name. + # + # If you have your own Chef Server, the default validation client name is + # chef-validator, unless you changed the configuration. + # + # chef.validation_client_name = "ORGNAME-validator" +end http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/config/cluster.xml ---------------------------------------------------------------------- diff --git a/integration-test/config/cluster.xml b/integration-test/config/cluster.xml new file mode 100644 index 0000000..d37e6e6 --- /dev/null +++ b/integration-test/config/cluster.xml @@ -0,0 +1,96 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration scan="true" scanPeriod="60 seconds"> + <appender name="A1" class="ch.qos.logback.core.rolling.RollingFileAppender"> + <file>/var/log/storm/${logfile.name}</file> + <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy"> + <fileNamePattern>/var/log/storm/${logfile.name}.%i</fileNamePattern> + <minIndex>1</minIndex> + <maxIndex>9</maxIndex> + </rollingPolicy> + + <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + <maxFileSize>100MB</maxFileSize> + </triggeringPolicy> + + <encoder> + <pattern>%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n</pattern> + </encoder> + </appender> + + <appender name="ACCESS" class="ch.qos.logback.core.rolling.RollingFileAppender"> + <file>/var/log/storm/access.log</file> + <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy"> + <fileNamePattern>/var/log/storm/access.log.%i</fileNamePattern> + <minIndex>1</minIndex> + <maxIndex>9</maxIndex> + </rollingPolicy> + + <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + <maxFileSize>100MB</maxFileSize> + </triggeringPolicy> + + <encoder> + <pattern>%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n</pattern> + </encoder> + </appender> + + <appender name="METRICS" class="ch.qos.logback.core.rolling.RollingFileAppender"> + <file>/var/log/storm/metrics.log</file> + <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy"> + <fileNamePattern>metrics.log.%i</fileNamePattern> + <minIndex>1</minIndex> + <maxIndex>9</maxIndex> + </rollingPolicy> + + <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + <maxFileSize>2MB</maxFileSize> + </triggeringPolicy> + + <encoder> + <pattern>%d %-8r %m%n</pattern> + </encoder> + </appender> + + <root level="INFO"> + <appender-ref ref="A1"/> + </root> + + <logger name="backtype.storm.messaging.netty"> + <level value="WARN" /> + <appender-ref ref="A1" /> + </logger> + + <logger name="backtype.storm"> + <level value="DEBUG" /> + <appender-ref ref="A1" /> + </logger> + + <logger name="backtype.storm.security.auth.authorizer" additivity="false"> + <level value="INFO" /> + <appender-ref ref="ACCESS" /> + </logger> + + <logger name="backtype.storm.metric.LoggingMetricsConsumer" additivity="false" > + <level value="INFO"/> + <appender-ref ref="METRICS"/> + </logger> + +</configuration> http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/config/common.sh ---------------------------------------------------------------------- diff --git a/integration-test/config/common.sh b/integration-test/config/common.sh new file mode 100644 index 0000000..c89319c --- /dev/null +++ b/integration-test/config/common.sh @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +USER_SCRIPT="user-script.sh" +[[ -f $USER_SCRIPT ]] && echo "Running ${USER_SCRIPT}" && bash ${USER_SCRIPT} || echo "${USER_SCRIPT} not found/executed, continuing." +#apt-get update +#apt-get --yes remove openjdk-6-jre-headless +#apt-get --yes install openjdk-7-jdk http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/config/etc-hosts ---------------------------------------------------------------------- diff --git a/integration-test/config/etc-hosts b/integration-test/config/etc-hosts new file mode 100644 index 0000000..45e2aa6 --- /dev/null +++ b/integration-test/config/etc-hosts @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +127.0.0.1 localhost localhost +192.168.100.100 node-1 http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/config/install-storm.sh ---------------------------------------------------------------------- diff --git a/integration-test/config/install-storm.sh b/integration-test/config/install-storm.sh new file mode 100644 index 0000000..2731abe --- /dev/null +++ b/integration-test/config/install-storm.sh @@ -0,0 +1,39 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# $1 is the storm binary zip file +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" + +groupadd storm +useradd --gid storm --home-dir /home/storm --create-home --shell /bin/bash storm + +unzip -o "$1" -d /usr/share/ +chown -R storm:storm /usr/share/apache-storm* +ln -s /usr/share/apache-storm* /usr/share/storm +ln -s /usr/share/storm/bin/storm /usr/bin/storm + +mkdir /etc/storm +chown storm:storm /etc/storm + +rm /usr/share/storm/conf/storm.yaml +cp "${SCRIPT_DIR}/storm.yaml" /usr/share/storm/conf/ +cp "${SCRIPT_DIR}/cluster.xml" /usr/share/storm/logback/ +ln -s /usr/share/storm/conf/storm.yaml /etc/storm/storm.yaml + +mkdir /var/log/storm +chown storm:storm /var/log/storm + +#sed -i 's/${storm.home}\/logs/\/var\/log\/storm/g' /usr/share/storm/logback/cluster.xml http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/config/install-zookeeper.sh ---------------------------------------------------------------------- diff --git a/integration-test/config/install-zookeeper.sh b/integration-test/config/install-zookeeper.sh new file mode 100644 index 0000000..a81a07c --- /dev/null +++ b/integration-test/config/install-zookeeper.sh @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +apt-get --yes install zookeeper=3.3.5* zookeeperd=3.3.5* +service zookeeper stop +echo maxClientCnxns=200 >> /etc/zookeeper/conf/zoo.cfg +service zookeeper start http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/config/storm.yaml ---------------------------------------------------------------------- diff --git a/integration-test/config/storm.yaml b/integration-test/config/storm.yaml new file mode 100644 index 0000000..eca352f --- /dev/null +++ b/integration-test/config/storm.yaml @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +storm.zookeeper.servers: + - "node1" + +nimbus.seeds: ["node1"] + +# netty transport +storm.messaging.transport: "org.apache.storm.messaging.netty.Context" +storm.messaging.netty.buffer_size: 16384 +storm.messaging.netty.max_retries: 10 +storm.messaging.netty.min_wait_ms: 1000 +storm.messaging.netty.max_wait_ms: 5000 + +drpc.servers: + - "node1" + +supervisor.slots.ports: [6700, 6701, 6702, 6703, 6704, 6705, 6706, 6707, 6708, 6709] http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/pom.xml ---------------------------------------------------------------------- diff --git a/integration-test/pom.xml b/integration-test/pom.xml new file mode 100755 index 0000000..47eb244 --- /dev/null +++ b/integration-test/pom.xml @@ -0,0 +1,250 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <name>storm-integration-test</name> + <groupId>org.apache.storm</groupId> + <artifactId>storm-integration-test</artifactId> + <version>0.3</version> + <packaging>jar</packaging> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <!-- see comment below... This fixes an annoyance with intellij --> + <provided.scope>provided</provided.scope> + <storm.version>1.0.1.2.5.0.0-781</storm.version> + <redirectTestOutputToFile>true</redirectTestOutputToFile> + <regression.downloadWorkerLogs>false</regression.downloadWorkerLogs> + <storm.conf.dir>/etc/storm/conf</storm.conf.dir> + <hadoop.conf.dir>/etc/hadoop/conf</hadoop.conf.dir> + </properties> + + <profiles> + <profile> + <id>intellij</id> + <properties> + <provided.scope>compile</provided.scope> + <regression.downloadWorkerLogs>true</regression.downloadWorkerLogs> + <storm.conf.dir>src/test/resources/storm-conf/</storm.conf.dir> + <hadoop.conf.dir>src/main/resources/hadoop-conf/</hadoop.conf.dir> + </properties> + </profile> + </profiles> + + <repositories> + <repository> + <releases> + <enabled>true</enabled> + </releases> + <snapshots> + <enabled>false</enabled> + </snapshots> + <id>central</id> + <url>http://repo1.maven.org/maven2/</url> + </repository> + <repository> + <releases> + <enabled>true</enabled> + </releases> + <snapshots> + <enabled>true</enabled> + </snapshots> + <id>clojars</id> + <url>https://clojars.org/repo/</url> + </repository> + <repository> + <id>hortonworks</id> + <url>http://nexus-private.hortonworks.com/nexus/content/groups/public/</url> + </repository> + </repositories> + + <dependencies> + <dependency> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + <version>6.8.5</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + <version>2.3</version> + </dependency> + <dependency> + <groupId>org.seleniumhq.selenium</groupId> + <artifactId>selenium-firefox-driver</artifactId> + <version>2.45.0</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.seleniumhq.selenium</groupId> + <artifactId>selenium-support</artifactId> + <version>2.45.0</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <version>${storm.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-solr</artifactId> + <version>${storm.version}</version> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-starter</artifactId> + <version>${storm.version}</version> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-hdfs</artifactId> + <version>${storm.version}</version> + </dependency> + </dependencies> + + <build> + <sourceDirectory>src/main/java</sourceDirectory> + <testSourceDirectory>src/test/java</testSourceDirectory> + <pluginManagement> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>2.3.2</version> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-resources-plugin</artifactId> + <version>2.6</version> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.18.1</version> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>2.4.1</version> + </plugin> + </plugins> + </pluginManagement> + + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.7</source> + <target>1.7</target> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-resources-plugin</artifactId> + <version>2.6</version> + <configuration> + <encoding>UTF-8</encoding> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <redirectTestOutputToFile>${redirectTestOutputToFile}</redirectTestOutputToFile> + <forkCount>1C</forkCount> + <argLine>-Xmx1024m</argLine> + <properties> + <property> + <name>listener</name> + <value>org.apache.storm.st.meta.TestngListener</value> + </property> + </properties> + <systemPropertyVariables> + <regression.downloadWorkerLogs>${regression.downloadWorkerLogs}</regression.downloadWorkerLogs> + </systemPropertyVariables> + <additionalClasspathElements> + <!-- Hack to get the dir in CP through CLI and idea --> + <additionalClasspathElement>${storm.conf.dir}</additionalClasspathElement> + <additionalClasspathElement>${hadoop.conf.dir}</additionalClasspathElement> + <additionalClasspathElement>${extra.classpath.1}</additionalClasspathElement> + <additionalClasspathElement>${extra.classpath.2}</additionalClasspathElement> + <additionalClasspathElement>${extra.classpath.3}</additionalClasspathElement> + <additionalClasspathElement>${extra.classpath.4}</additionalClasspathElement> + <additionalClasspathElement>${extra.classpath.5}</additionalClasspathElement> + <additionalClasspathElement>${extra.classpath.6}</additionalClasspathElement> + </additionalClasspathElements> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <configuration> + <createDependencyReducedPom>true</createDependencyReducedPom> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.sf</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.dsa</exclude> + <exclude>META-INF/*.RSA</exclude> + <exclude>META-INF/*.rsa</exclude> + <exclude>META-INF/*.EC</exclude> + <exclude>META-INF/*.ec</exclude> + <exclude>META-INF/MSFTSIG.SF</exclude> + <exclude>META-INF/MSFTSIG.RSA</exclude> + </excludes> + </filter> + </filters> + </configuration> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <transformers> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + <resources> + <resource> + <directory>${hadoop.conf.dir}</directory> + </resource> + </resources> + </build> +</project> http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/run-it.sh ---------------------------------------------------------------------- diff --git a/integration-test/run-it.sh b/integration-test/run-it.sh new file mode 100755 index 0000000..3b4a861 --- /dev/null +++ b/integration-test/run-it.sh @@ -0,0 +1,83 @@ +#!/usr/bin/env bash +# -*- compile-command: "cd config/ && vagrant destroy -f; vagrant up" -*- +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +set -x +set -e +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +echo SCRIPT_DIR="${SCRIPT_DIR}" +STORM_SRC_DIR=$(dirname ${SCRIPT_DIR}) +echo SCRIPT_SRC_DIR="${SCRIPT_SRC_DIR}" +function die() { + echo $* + exit 1 +} +function list_storm_processes() { + (ps -ef | grep -i -e zookeeper | grep -v grep) && (ps -ef | grep -i -e storm.home | grep -v grep) +} + +list_storm_processes || true +# increasing swap space so we can run lots of workers +sudo dd if=/dev/zero of=/swapfile.img bs=8192 count=1M +sudo mkswap /swapfile.img +sudo swapon /swapfile.img + +if [[ "${USER}" == "vagrant" ]]; then # install oracle jdk8 + sudo apt-get update + sudo apt-get -y install python-software-properties + sudo apt-add-repository -y ppa:webupd8team/java + sudo apt-get update + echo "oracle-java8-installer shared/accepted-oracle-license-v1-1 select true" | sudo debconf-set-selections + sudo apt-get install -y oracle-java8-installer + sudo apt-get -y install maven + java -version + mvn --version + export MAVEN_OPTS="-Xmx3000m" +else + ( while true; do echo "heartbeat"; sleep 300; done ) & #heartbeat needed by travis ci + (cd ${STORM_SRC_DIR} && mvn clean install -DskipTests=true) || die "maven install command failed" + (cd ${STORM_SRC_DIR}/storm-dist/binary && mvn package -Dgpg.skip=true) || die "maven package command failed" +fi +storm_binary_zip=$(find ${STORM_SRC_DIR}/storm-dist -iname '*.zip') +storm_binary_name=$(basename ${storm_binary_zip}) +export STORM_VERSION=$(grep -oPe '\d.*(?=.zip)' <<<${storm_binary_name}) +echo "Using storm version:" ${STORM_VERSION} + +# setup storm cluster +list_storm_processes || true +sudo bash ${SCRIPT_DIR}/config/common.sh +sudo bash ${SCRIPT_DIR}/config/install-zookeeper.sh +sudo bash ${SCRIPT_DIR}/config/install-storm.sh $storm_binary_zip +export JAVA_HOME +env +function start_storm_process() { + echo starting: storm $1 + sudo su storm -c "export JAVA_HOME=${JAVA_HOME} && cd /home/storm && storm $1" & +} +start_storm_process nimbus +start_storm_process ui +start_storm_process supervisor +start_storm_process logviewer +#start_storm_process drpc +pushd "${SCRIPT_DIR}" +mvn clean package -DskipTests -Dstorm.version=${STORM_VERSION} +for i in {1..20} ; do + list_storm_processes && break + sleep 6 +done +list_storm_processes +mvn test -DfailIfNoTests=false -Dstorm.version=${STORM_VERSION} -Dui.url=http://localhost:8744 http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/main/java/org/apache/storm/ExclamationTopology.java ---------------------------------------------------------------------- diff --git a/integration-test/src/main/java/org/apache/storm/ExclamationTopology.java b/integration-test/src/main/java/org/apache/storm/ExclamationTopology.java new file mode 100644 index 0000000..d464608 --- /dev/null +++ b/integration-test/src/main/java/org/apache/storm/ExclamationTopology.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm; + +import org.apache.storm.generated.StormTopology; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.testing.TestWordSpout; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; + +import java.util.Map; + +/** + * This is a basic example of a Storm topology. + */ +public class ExclamationTopology { + + public static final String WORD = "word"; + public static final String EXCLAIM_1 = "exclaim1"; + public static final String EXCLAIM_2 = "exclaim2"; + + public static class ExclamationBolt extends BaseRichBolt { + OutputCollector _collector; + + @Override + public void prepare(Map conf, TopologyContext context, OutputCollector collector) { + _collector = collector; + } + + @Override + public void execute(Tuple tuple) { + _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); + _collector.ack(tuple); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word")); + } + + + } + + public static void main(String[] args) throws Exception { + StormTopology topology = getStormTopology(); + + Config conf = new Config(); + conf.setDebug(true); + + if (args != null && args.length > 0) { + conf.setNumWorkers(3); + + StormSubmitter.submitTopologyWithProgressBar(args[0], conf, topology); + } + else { + + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("test", conf, topology); + Utils.sleep(10000); + cluster.killTopology("test"); + cluster.shutdown(); + } + } + + public static StormTopology getStormTopology() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout(WORD, new TestWordSpout(), 10); + builder.setBolt(EXCLAIM_1, new ExclamationTopology.ExclamationBolt(), 3).shuffleGrouping(WORD); + builder.setBolt(EXCLAIM_2, new ExclamationTopology.ExclamationBolt(), 2).shuffleGrouping(EXCLAIM_1); + return builder.createTopology(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/main/java/org/apache/storm/debug/DebugHelper.java ---------------------------------------------------------------------- diff --git a/integration-test/src/main/java/org/apache/storm/debug/DebugHelper.java b/integration-test/src/main/java/org/apache/storm/debug/DebugHelper.java new file mode 100644 index 0000000..97c0554 --- /dev/null +++ b/integration-test/src/main/java/org/apache/storm/debug/DebugHelper.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.debug; + +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URL; +import java.net.URLClassLoader; + +public class DebugHelper { + private static final Logger LOG = LoggerFactory.getLogger(DebugHelper.class); + + public static void printClassPath() { + URL[] urls = getClassPaths(); + LOG.info("classpath:" + StringUtils.join(urls, ':')); + } + + public static URL[] getClassPaths() { + ClassLoader cl = ClassLoader.getSystemClassLoader(); + return ((URLClassLoader)cl).getURLs(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/main/java/org/apache/storm/st/topology/TestableTopology.java ---------------------------------------------------------------------- diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/TestableTopology.java b/integration-test/src/main/java/org/apache/storm/st/topology/TestableTopology.java new file mode 100644 index 0000000..3823310 --- /dev/null +++ b/integration-test/src/main/java/org/apache/storm/st/topology/TestableTopology.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.st.topology; + +import org.apache.storm.generated.StormTopology; + +import java.util.List; + +public interface TestableTopology { + String DUMMY_FIELD = "dummy"; + List<String> getExpectedOutput(); + StormTopology newTopology(); + String getBoltName(); + String getSpoutName(); +} http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java ---------------------------------------------------------------------- diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java new file mode 100644 index 0000000..430449b --- /dev/null +++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.st.topology.window; + +import com.google.common.base.Function; +import com.google.common.collect.Collections2; +import com.google.common.collect.Lists; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.st.topology.TestableTopology; +import org.apache.storm.st.topology.window.data.TimeData; +import org.apache.storm.st.utils.StringDecorator; +import org.apache.storm.st.utils.TimeUtil; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.windowing.TupleWindow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +/** + * Computes sliding window sum + */ +public class SlidingTimeCorrectness implements TestableTopology { + private static final Logger LOG = LoggerFactory.getLogger(SlidingTimeCorrectness.class); + private final int windowSec; + private final int slideSec; + private final String spoutName; + private final String boltName; + + public SlidingTimeCorrectness(int windowSec, int slideSec) { + this.windowSec = windowSec; + this.slideSec = slideSec; + final String prefix = this.getClass().getSimpleName() + "-winSec" + windowSec + "slideSec" + slideSec; + spoutName = prefix + "IncrementingSpout"; + boltName = prefix + "VerificationBolt"; + } + + public String getBoltName() { + return boltName; + } + + public String getSpoutName() { + return spoutName; + } + + public StormTopology newTopology() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout(getSpoutName(), new IncrementingSpout(), 2); + builder.setBolt(getBoltName(), + new VerificationBolt() + .withWindow(new BaseWindowedBolt.Duration(windowSec, TimeUnit.SECONDS), + new BaseWindowedBolt.Duration(slideSec, TimeUnit.SECONDS)) + .withTimestampField(TimeData.getTimestampFieldName()) + .withLag(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS)), + 1) + .globalGrouping(getSpoutName()); + return builder.createTopology(); + } + + public List<String> getExpectedOutput() { + return Lists.newArrayList( + StringDecorator.decorate(getBoltName(), "tuplesInWindow.size() = " + windowSec), + StringDecorator.decorate(getBoltName(), "newTuples.size() = " + slideSec), + StringDecorator.decorate(getBoltName(), "expiredTuples.size() = " + slideSec) + ); + } + + public static class IncrementingSpout extends BaseRichSpout { + private static final Logger LOG = LoggerFactory.getLogger(IncrementingSpout.class); + private SpoutOutputCollector collector; + private static int currentNum; + private static Random rng = new Random(); + private String componentId; + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(TimeData.getFields()); + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + componentId = context.getThisComponentId(); + this.collector = collector; + } + + @Override + public void nextTuple() { + TimeUtil.sleepMilliSec(rng.nextInt(800)); + currentNum++; + TimeData data = TimeData.newData(currentNum); + final Values tuple = data.getValues(); + collector.emit(tuple); + LOG.info(StringDecorator.decorate(componentId, data.toString())); + } + + @Override + public void ack(Object msgId) { + LOG.info("Received ACK for msgId : " + msgId); + } + + @Override + public void fail(Object msgId) { + LOG.info("Received FAIL for msgId : " + msgId); + } + } + + public static class VerificationBolt extends BaseWindowedBolt { + private OutputCollector collector; + private String componentId; + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + componentId = context.getThisComponentId(); + this.collector = collector; + } + + @Override + public void execute(TupleWindow inputWindow) { + List<Tuple> tuplesInWindow = inputWindow.get(); + List<Tuple> newTuples = inputWindow.getNew(); + List<Tuple> expiredTuples = inputWindow.getExpired(); + LOG.info("tuplesInWindow.size() = " + tuplesInWindow.size()); + LOG.info("newTuples.size() = " + newTuples.size()); + LOG.info("expiredTuples.size() = " + expiredTuples.size()); + Collection<TimeData> dataInWindow = Collections2.transform(tuplesInWindow, new Function<Tuple, TimeData>() { + @Nullable + @Override + public TimeData apply(@Nullable Tuple input) { + return TimeData.fromTuple(input); + } + }); + final String jsonData = TimeData.toString(dataInWindow); + LOG.info(StringDecorator.decorate(componentId, jsonData)); + collector.emit(new Values("dummyValue")); + } + + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields(DUMMY_FIELD)); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java ---------------------------------------------------------------------- diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java new file mode 100644 index 0000000..33ee004 --- /dev/null +++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.st.topology.window; + +import com.google.common.collect.Lists; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.windowing.TupleWindow; +import org.apache.storm.st.topology.TestableTopology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.storm.st.utils.StringDecorator; +import org.apache.storm.st.utils.TimeUtil; + +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +/** + * Computes sliding window sum + */ +public class SlidingWindowCorrectness implements TestableTopology { + private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowCorrectness.class); + private static final String NUMBER_FIELD = "number"; + private static final String STRING_FIELD = "numAsStr"; + private final int windowSize; + private final int slideSize; + private final String spoutName; + private final String boltName; + + public SlidingWindowCorrectness(int windowSize, int slideSize) { + this.windowSize = windowSize; + this.slideSize = slideSize; + final String prefix = this.getClass().getSimpleName() + "-winSize" + windowSize + "slideSize" + slideSize; + spoutName = prefix + "IncrementingSpout"; + boltName = prefix + "VerificationBolt"; + } + + public String getBoltName() { + return boltName; + } + + public String getSpoutName() { + return spoutName; + } + + public StormTopology newTopology() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout(getSpoutName(), new IncrementingSpout(), 1); + builder.setBolt(getBoltName(), + new VerificationBolt() + .withLag(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS)) + .withWindow(new BaseWindowedBolt.Count(windowSize), new BaseWindowedBolt.Count(slideSize)), + 1) + .shuffleGrouping(getSpoutName()); + return builder.createTopology(); + } + + public List<String> getExpectedOutput() { + return Lists.newArrayList( + StringDecorator.decorate(getBoltName(), "tuplesInWindow.size() = " + windowSize), + StringDecorator.decorate(getBoltName(), "newTuples.size() = " + slideSize), + StringDecorator.decorate(getBoltName(), "expiredTuples.size() = " + slideSize) + ); + } + + public static class IncrementingSpout extends BaseRichSpout { + private static final Logger LOG = LoggerFactory.getLogger(IncrementingSpout.class); + private SpoutOutputCollector collector; + private static int currentNum; + private static Random rng = new Random(); + private String componentId; + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields(NUMBER_FIELD, STRING_FIELD)); + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + componentId = context.getThisComponentId(); + this.collector = collector; + } + + @Override + public void nextTuple() { + TimeUtil.sleepMilliSec(rng.nextInt(10)); + currentNum++; + final String numAsStr = "str(" + currentNum + ")str"; + final Values tuple = new Values(currentNum, numAsStr); + LOG.info(StringDecorator.decorate(componentId, tuple.toString())); + collector.emit(tuple, currentNum); + } + + @Override + public void ack(Object msgId) { + LOG.info("Received ACK for msgId : " + msgId); + } + + @Override + public void fail(Object msgId) { + LOG.info("Received FAIL for msgId : " + msgId); + } + } + + public static class VerificationBolt extends BaseWindowedBolt { + private OutputCollector collector; + private String componentId; + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + componentId = context.getThisComponentId(); + this.collector = collector; + } + + @Override + public void execute(TupleWindow inputWindow) { + List<Tuple> tuplesInWindow = inputWindow.get(); + List<Tuple> newTuples = inputWindow.getNew(); + List<Tuple> expiredTuples = inputWindow.getExpired(); + LOG.info("tuplesInWindow.size() = " + tuplesInWindow.size()); + LOG.info("newTuples.size() = " + newTuples.size()); + LOG.info("expiredTuples.size() = " + expiredTuples.size()); + LOG.info(StringDecorator.decorate(componentId, "tuplesInWindow = " + tuplesInWindow.toString())); + collector.emit(new Values("dummyValue")); + } + + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields(DUMMY_FIELD)); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java ---------------------------------------------------------------------- diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java new file mode 100644 index 0000000..a77836f --- /dev/null +++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.st.topology.window; + +import com.google.common.base.Function; +import com.google.common.collect.Collections2; +import com.google.common.collect.Lists; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.st.topology.TestableTopology; +import org.apache.storm.st.topology.window.data.TimeData; +import org.apache.storm.st.utils.TimeUtil; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.windowing.TupleWindow; +import org.apache.storm.st.utils.StringDecorator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +/** + * Computes sliding window sum + */ +public class TumblingTimeCorrectness implements TestableTopology { + private static final Logger LOG = LoggerFactory.getLogger(TumblingTimeCorrectness.class); + private final int tumbleSec; + private final String spoutName; + private final String boltName; + + public TumblingTimeCorrectness(int timbleSec) { + this.tumbleSec = timbleSec; + final String prefix = this.getClass().getSimpleName() + "-timbleSec" + timbleSec; + spoutName = prefix + "IncrementingSpout"; + boltName = prefix + "VerificationBolt"; + } + + public String getBoltName() { + return boltName; + } + + public String getSpoutName() { + return spoutName; + } + + public StormTopology newTopology() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout(getSpoutName(), new IncrementingSpout(), 2); + builder.setBolt(getBoltName(), + new VerificationBolt() + .withTumblingWindow(new BaseWindowedBolt.Duration(tumbleSec, TimeUnit.SECONDS)) + .withLag(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS)) + .withTimestampField(TimeData.getTimestampFieldName()), + 1) + .globalGrouping(getSpoutName()); + return builder.createTopology(); + } + + public List<String> getExpectedOutput() { + return Lists.newArrayList( + StringDecorator.decorate(getBoltName(), "tuplesInWindow.size() = " + tumbleSec), + StringDecorator.decorate(getBoltName(), "newTuples.size() = " + tumbleSec), + StringDecorator.decorate(getBoltName(), "expiredTuples.size() = " + tumbleSec) + ); + } + + public static class IncrementingSpout extends BaseRichSpout { + private static final Logger LOG = LoggerFactory.getLogger(IncrementingSpout.class); + private SpoutOutputCollector collector; + private static int currentNum; + private static Random rng = new Random(); + private String componentId; + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(TimeData.getFields()); + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + componentId = context.getThisComponentId(); + this.collector = collector; + } + + @Override + public void nextTuple() { + TimeUtil.sleepMilliSec(rng.nextInt(800)); + currentNum++; + TimeData data = TimeData.newData(currentNum); + final Values tuple = data.getValues(); + collector.emit(tuple); + LOG.info(StringDecorator.decorate(componentId, data.toString())); + } + + @Override + public void ack(Object msgId) { + LOG.info("Received ACK for msgId : " + msgId); + } + + @Override + public void fail(Object msgId) { + LOG.info("Received FAIL for msgId : " + msgId); + } + } + + public static class VerificationBolt extends BaseWindowedBolt { + private OutputCollector collector; + private String componentId; + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + componentId = context.getThisComponentId(); + this.collector = collector; + } + + @Override + public void execute(TupleWindow inputWindow) { + List<Tuple> tuplesInWindow = inputWindow.get(); + List<Tuple> newTuples = inputWindow.getNew(); + List<Tuple> expiredTuples = inputWindow.getExpired(); + LOG.info("tuplesInWindow.size() = " + tuplesInWindow.size()); + LOG.info("newTuples.size() = " + newTuples.size()); + LOG.info("expiredTuples.size() = " + expiredTuples.size()); + Collection<TimeData> dataInWindow = Collections2.transform(tuplesInWindow, new Function<Tuple, TimeData>() { + @Nullable + @Override + public TimeData apply(@Nullable Tuple input) { + return TimeData.fromTuple(input); + } + }); + final String jsonData = TimeData.toString(dataInWindow); + LOG.info(StringDecorator.decorate(componentId, jsonData)); + collector.emit(new Values("dummyValue")); + } + + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields(DUMMY_FIELD)); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java ---------------------------------------------------------------------- diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java new file mode 100644 index 0000000..22c6d75 --- /dev/null +++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.st.topology.window; + +import com.google.common.collect.Lists; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.windowing.TupleWindow; +import org.apache.storm.st.topology.TestableTopology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.storm.st.utils.StringDecorator; +import org.apache.storm.st.utils.TimeUtil; + +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +/** + * Computes sliding window sum + */ +public class TumblingWindowCorrectness implements TestableTopology { + private static final Logger LOG = LoggerFactory.getLogger(TumblingWindowCorrectness.class); + private static final String NUMBER_FIELD = "number"; + private static final String STRING_FIELD = "numAsStr"; + private final int tumbleSize; + private final String spoutName; + private final String boltName; + + public TumblingWindowCorrectness(final int tumbleSize) { + this.tumbleSize = tumbleSize; + final String prefix = this.getClass().getSimpleName() + "-tubleSize" + tumbleSize; + spoutName = prefix + "IncrementingSpout"; + boltName = prefix + "VerificationBolt"; + } + + public String getBoltName() { + return boltName; + } + + public String getSpoutName() { + return spoutName; + } + + public StormTopology newTopology() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout(getSpoutName(), new IncrementingSpout(), 1); + builder.setBolt(getBoltName(), + new VerificationBolt() + .withLag(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS)) + .withTumblingWindow(new BaseWindowedBolt.Count(tumbleSize)), 1) + .shuffleGrouping(getSpoutName()); + return builder.createTopology(); + } + + public List<String> getExpectedOutput() { + return Lists.newArrayList( + StringDecorator.decorate(getBoltName(), "tuplesInWindow.size() = " + tumbleSize), + StringDecorator.decorate(getBoltName(), "newTuples.size() = " + tumbleSize), + StringDecorator.decorate(getBoltName(), "expiredTuples.size() = " + tumbleSize) + ); + } + + public static class IncrementingSpout extends BaseRichSpout { + private static final Logger LOG = LoggerFactory.getLogger(IncrementingSpout.class); + private SpoutOutputCollector collector; + private static int currentNum; + private static Random rng = new Random(); + private String componentId; + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields(NUMBER_FIELD, STRING_FIELD)); + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + componentId = context.getThisComponentId(); + this.collector = collector; + } + + @Override + public void nextTuple() { + TimeUtil.sleepMilliSec(rng.nextInt(10)); + currentNum++; + final String numAsStr = "str(" + currentNum + ")str"; + final Values tuple = new Values(currentNum, numAsStr); + LOG.info(StringDecorator.decorate(componentId, tuple.toString())); + collector.emit(tuple, currentNum); + } + + @Override + public void ack(Object msgId) { + LOG.info("Received ACK for msgId : " + msgId); + } + + @Override + public void fail(Object msgId) { + LOG.info("Received FAIL for msgId : " + msgId); + } + } + + public static class VerificationBolt extends BaseWindowedBolt { + private OutputCollector collector; + private String componentId; + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + componentId = context.getThisComponentId(); + this.collector = collector; + } + + @Override + public void execute(TupleWindow inputWindow) { + List<Tuple> tuplesInWindow = inputWindow.get(); + List<Tuple> newTuples = inputWindow.getNew(); + List<Tuple> expiredTuples = inputWindow.getExpired(); + LOG.info("tuplesInWindow.size() = " + tuplesInWindow.size()); + LOG.info("newTuples.size() = " + newTuples.size()); + LOG.info("expiredTuples.size() = " + expiredTuples.size()); + LOG.info(StringDecorator.decorate(componentId, "tuplesInWindow = " + tuplesInWindow.toString())); + collector.emit(new Values("dummyValue")); + } + + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields(DUMMY_FIELD)); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/main/java/org/apache/storm/st/topology/window/data/FromJson.java ---------------------------------------------------------------------- diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/data/FromJson.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/data/FromJson.java new file mode 100644 index 0000000..d749abf --- /dev/null +++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/data/FromJson.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.st.topology.window.data; + +public interface FromJson<T> { + T fromJson(String jsonStr); +} http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/main/java/org/apache/storm/st/topology/window/data/TimeData.java ---------------------------------------------------------------------- diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/data/TimeData.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/data/TimeData.java new file mode 100644 index 0000000..cd2c7a5 --- /dev/null +++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/data/TimeData.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.st.topology.window.data; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.Collection; +import java.util.Date; + +public class TimeData implements Comparable<TimeData>, FromJson<TimeData> { + public static final TimeData CLS = new TimeData(-1); + private static final String NUMBER_FIELD_NAME = "number"; + private static final String STRING_FIELD_NAME = "dateAsStr"; + private static final String TIMESTAMP_FIELD_NAME = "date"; + static final Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").create(); + private final int num; + private final Date now; + private final long timestamp; + + private TimeData(int num) { + this(num, new Date()); + } + + private TimeData(int num, Date date) { + this.num = num; + this.now = date; + this.timestamp = date.getTime(); + } + + public static TimeData newData(int num) { + return new TimeData(num); + } + + public static TimeData fromTuple(Tuple tuple) { + return new TimeData(tuple.getIntegerByField(NUMBER_FIELD_NAME), new Date(tuple.getLongByField(TIMESTAMP_FIELD_NAME))); + } + + public TimeData fromJson(String jsonStr) { + return gson.fromJson(jsonStr, TimeData.class); + } + + public String toString() { + return gson.toJson(this); + } + + public static String toString(Collection<TimeData> elements) { + return gson.toJson(elements); + } + + public Values getValues() { + return new Values(num, now.toString(), timestamp); + } + + public static String getTimestampFieldName() { + return TIMESTAMP_FIELD_NAME; + } + + public Date getDate() { + return now; + } + + public static Fields getFields() { + return new Fields(NUMBER_FIELD_NAME, STRING_FIELD_NAME, TIMESTAMP_FIELD_NAME); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TimeData data = (TimeData) o; + + if (num != data.num) return false; + if (timestamp != data.timestamp) return false; + return now.equals(data.now); + + } + + @Override + public int hashCode() { + int result = num; + result = 31 * result + now.hashCode(); + result = 31 * result + (int) (timestamp ^ (timestamp >>> 32)); + return result; + } + + @Override + public int compareTo(TimeData o) { + return Long.compare(timestamp, o.timestamp); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/main/java/org/apache/storm/st/topology/window/data/TimeDataWindow.java ---------------------------------------------------------------------- diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/data/TimeDataWindow.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/data/TimeDataWindow.java new file mode 100644 index 0000000..d6cb9d6 --- /dev/null +++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/data/TimeDataWindow.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.st.topology.window.data; + +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; +import com.google.gson.reflect.TypeToken; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +public class TimeDataWindow extends ArrayList<TimeData> implements FromJson<TimeDataWindow> { + public static final TimeDataWindow CLS = new TimeDataWindow(); + private static final Type listType = new TypeToken<List<TimeData>>() {}.getType(); + + private TimeDataWindow() { + } + + private TimeDataWindow(List<TimeData> data) { + super(data); + } + + public static TimeDataWindow newInstance(Collection<TimeData> data) { + final List<TimeData> dataCopy = new ArrayList<>(data); + Collections.sort(dataCopy); + return new TimeDataWindow(dataCopy); + } + + public static TimeDataWindow newInstance(Collection<TimeData> data, Predicate<TimeData> predicate) { + return newInstance(Collections2.filter(data, predicate)); + } + + public static TimeDataWindow newInstance(Collection<TimeData> data, final DateTime fromDate, final DateTime toDate) { + return TimeDataWindow.newInstance(data, new Predicate<TimeData>() { + @Override + public boolean apply(@Nullable TimeData input) { + if (input == null) { + return false; + } + final DateTime inputDate = new DateTime(input.getDate()); + return inputDate.isAfter(fromDate) && inputDate.isBefore(toDate.plusMillis(1)); + } + }); + } + + public TimeData first() { + return get(0); + } + + public TimeData last() { + return get(size()-1); + } + + public String getDescription() { + final int size = size(); + if (size > 0) { + final TimeData first = first(); + final TimeData last = last(); + return "Total " + size + " items: " + first + " to " + last; + } + return "Total " + size + " items."; + } + + public TimeDataWindow fromJson(String jsonStr) { + final List<TimeData> dataList = TimeData.gson.fromJson(jsonStr, listType); + Collections.sort(dataList); + return TimeDataWindow.newInstance(dataList); + } + + +} http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/main/java/org/apache/storm/st/utils/StringDecorator.java ---------------------------------------------------------------------- diff --git a/integration-test/src/main/java/org/apache/storm/st/utils/StringDecorator.java b/integration-test/src/main/java/org/apache/storm/st/utils/StringDecorator.java new file mode 100644 index 0000000..34c2b65 --- /dev/null +++ b/integration-test/src/main/java/org/apache/storm/st/utils/StringDecorator.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.st.utils; + +import org.apache.commons.lang.StringUtils; + +public class StringDecorator { + + private static final String UNIQUE_PREFIX = "---bed91874d79720f7e324c43d49dba4ff---"; + + public static String decorate(String componentId, String decorate) { + return componentId + UNIQUE_PREFIX + decorate; + } + + public static boolean isDecorated(String str) { + return str != null && str.contains(UNIQUE_PREFIX); + } + + public static String[] split2(String decoratedString) { + return StringUtils.splitByWholeSeparator(decoratedString, UNIQUE_PREFIX, 2); + } +}