STORM-2090: Add integration test for storm windowing

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1b812b7b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1b812b7b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1b812b7b

Branch: refs/heads/1.0.x-branch
Commit: 1b812b7bd3078bd52d5447579a8041e990b88028
Parents: ffcd615
Author: Raghav Kumar Gautam <rag...@apache.org>
Authored: Mon Sep 19 10:26:56 2016 -0700
Committer: Jungtaek Lim <kabh...@gmail.com>
Committed: Fri Sep 23 10:34:22 2016 +0900

----------------------------------------------------------------------
 .gitignore                                      |   3 +
 .travis.yml                                     |   8 +-
 integration-test/README.md                      |  59 +++
 integration-test/config/Vagrantfile             | 183 +++++++++
 integration-test/config/cluster.xml             |  96 +++++
 integration-test/config/common.sh               |  21 ++
 integration-test/config/etc-hosts               |  18 +
 integration-test/config/install-storm.sh        |  39 ++
 integration-test/config/install-zookeeper.sh    |  20 +
 integration-test/config/storm.yaml              |  33 ++
 integration-test/pom.xml                        | 250 ++++++++++++
 integration-test/run-it.sh                      |  83 ++++
 .../org/apache/storm/ExclamationTopology.java   |  93 +++++
 .../org/apache/storm/debug/DebugHelper.java     |  39 ++
 .../storm/st/topology/TestableTopology.java     |  30 ++
 .../topology/window/SlidingTimeCorrectness.java | 170 +++++++++
 .../window/SlidingWindowCorrectness.java        | 157 ++++++++
 .../window/TumblingTimeCorrectness.java         | 167 ++++++++
 .../window/TumblingWindowCorrectness.java       | 154 ++++++++
 .../storm/st/topology/window/data/FromJson.java |  22 ++
 .../storm/st/topology/window/data/TimeData.java | 110 ++++++
 .../st/topology/window/data/TimeDataWindow.java |  91 +++++
 .../apache/storm/st/utils/StringDecorator.java  |  37 ++
 .../org/apache/storm/st/utils/TimeUtil.java     |  54 +++
 .../test/java/org/apache/storm/st/DemoTest.java |  85 +++++
 .../apache/storm/st/helper/AbstractTest.java    |  27 ++
 .../apache/storm/st/meta/TestngListener.java    |  97 +++++
 .../st/tests/window/SlidingWindowTest.java      | 187 +++++++++
 .../st/tests/window/TumblingWindowTest.java     |  99 +++++
 .../org/apache/storm/st/utils/AssertUtil.java   |  71 ++++
 .../org/apache/storm/st/wrapper/LogData.java    |  66 ++++
 .../apache/storm/st/wrapper/StormCluster.java   | 118 ++++++
 .../org/apache/storm/st/wrapper/TopoWrap.java   | 378 +++++++++++++++++++
 .../src/test/resources/storm-conf/storm.yaml    |  33 ++
 pom.xml                                         |   3 +
 35 files changed, 3099 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 5c66aba..636570f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -54,3 +54,6 @@ logs
 
 # virtual machine crash logs, see 
http://www.java.com/en/download/help/error_hotspot.xml
 hs_err_pid*
+
+# ignore vagrant files
+/integration-test/config/.vagrant/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 05e24fe..44942e1 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -9,6 +9,9 @@
 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
+addons:
+  hosts:
+    - node1
 
 env:
   - MODULES=storm-core
@@ -23,8 +26,9 @@ before_install:
   - nvm install 0.12.2
   - nvm use 0.12.2
 install: /bin/bash ./dev-tools/travis/travis-install.sh `pwd`
-script: /bin/bash ./dev-tools/travis/travis-script.sh `pwd` $MODULES
-sudo: false
+script:
+  - /bin/bash ./dev-tools/travis/travis-script.sh `pwd` $MODULES && /bin/bash 
./integration-test/run-it.sh
+sudo: true
 cache:
   directories:
     - "$HOME/.m2/repository"

http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/README.md
----------------------------------------------------------------------
diff --git a/integration-test/README.md b/integration-test/README.md
new file mode 100644
index 0000000..36c70a1
--- /dev/null
+++ b/integration-test/README.md
@@ -0,0 +1,59 @@
+End to end storm integration tests
+==================================
+
+Running tests end-to-end
+------------------------
+Assumption:
+A single version of storm binary zip such as 
`storm-dist/binary/target/apache-storm-2.0.0-SNAPSHOT.zip` is present
+The following command will bring up a vagrant cluster.
+```sh
+cd integration-test/config
+vagrant up
+```
+This automatically will run `integration-test/run-it.sh`.
+This brings up a vagrant machine, with storm and zookeeper daemons.
+And runs all the tests against it.
+
+Running tests for development & debugging
+=========================================
+```vagrant up``` command is steup as a complete auto-pilot.
+Following describes how we can run individual tests against this vagrant 
cluster or any other cluster.
+
+Configs for running
+-------------------
+The supplied configuration will run tests against vagrant setup. However, it 
can be changed to use a different cluster.
+Change `integration-test/src/test/resources/storm.yaml` as necessary.
+
+Running all tests manually
+--------------------------
+To run all tests:
+```sh
+mvn clean package -DskipTests && mvn test
+```
+
+To run a single test:
+```sh
+mvn clean package -DskipTests && mvn test -Dtest=SlidingWindowCountTest
+```
+
+Running tests from IDE
+----------------------
+You might have to enable intellij profile to make your IDE happy.
+Make sure that the following is run before tests are launched.
+```sh
+mvn package -DskipTests
+```
+
+Running tests with custom storm version
+---------------------------------------
+You can supply custom storm version using `-Dstorm.version=<storm-version>` 
property to all the maven commands.
+```sh
+mvn clean package -DskipTests -Dstorm.version=<storm-version>
+mvn test -Dtest=DemoTest -Dstorm.version=<storm-version>
+```
+
+To find version of the storm that you are running run `storm version` command.
+
+Code
+----
+Start off by looking at file 
[DemoTest.java](https://github.com/apache/storm/integration-test/blob/master/src/test/java/org/apache/storm/st/DemoTest.java).

http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/config/Vagrantfile
----------------------------------------------------------------------
diff --git a/integration-test/config/Vagrantfile 
b/integration-test/config/Vagrantfile
new file mode 100644
index 0000000..7898d9b
--- /dev/null
+++ b/integration-test/config/Vagrantfile
@@ -0,0 +1,183 @@
+# -*- mode: ruby; compile-command: "vagrant destroy -f; vagrant up" -*-
+# vi: set ft=ruby :
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+require 'uri'
+# Vagrantfile API/syntax version. Don't touch unless you know what you're 
doing!
+VAGRANTFILE_API_VERSION = "2"
+STORM_BOX_TYPE = "hashicorp/precise64"
+STORM_ZIP = Dir.glob("../../storm-dist/binary/target/**/*.zip")
+if(STORM_ZIP.length != 1)
+  raise "expected one storm-binary found: " + STORM_ZIP.join(",")
+end
+STORM_ARCHIVE = STORM_ZIP[0]
+STORM_VERSION = File.basename(STORM_ARCHIVE, '.*')
+STORM_SUPERVISOR_COUNT = 2
+
+Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
+
+  config.vm.box = STORM_BOX_TYPE
+  #config.hostmanager.manage_host = true
+  #config.hostmanager.enabled = true
+
+  if Vagrant.has_plugin?("vagrant-cachier")
+    # Configure cached packages to be shared between instances of the same 
base box.
+    # More info on the "Usage" link above
+    config.cache.scope = :box
+
+    # OPTIONAL: If you are using VirtualBox, you might want to use that to 
enable
+    # NFS for shared folders. This is also very useful for vagrant-libvirt if 
you
+    # want bi-directional sync
+    config.cache.synced_folder_opts = {
+      type: :nfs,
+      # The nolock option can be useful for an NFSv3 client that wants to 
avoid the
+      # NLM sideband protocol. Without this option, apt-get might hang if it 
tries
+      # to lock files needed for /var/cache/* operations. All of this can be 
avoided
+      # by using NFSv4 everywhere. Please note that the tcp option is not the 
default.
+      mount_options: ['rw', 'vers=3', 'tcp', 'nolock']
+    }
+  end
+
+  if(!File.exist?(STORM_ARCHIVE))
+    `wget -N #{STORM_DIST_URL}`
+  end
+
+  config.vm.synced_folder "../../", "/home/vagrant/build/vagrant/storm"
+  config.vm.synced_folder "~/.m2", "/home/vagrant/.m2"
+
+  config.vm.define "node1" do |node1|
+    node1.vm.provider "virtualbox" do |v|
+      v.customize ["modifyvm", :id, "--natdnshostresolver1", "on"]
+    end
+    node1.vm.network "private_network", ip: "192.168.50.3"
+    node1.vm.hostname = "node1"
+    node1.vm.provision :shell, :inline => "echo run integration test; whoami; 
env; cd /home/vagrant/build/vagrant/storm/; pwd; bash 
integration-test/run-it.sh", privileged: false
+    #node1.vm.provision :shell, :inline => "sudo ln -fs /vagrant/etc-hosts 
/etc/hosts"
+  end
+
+
+  # All Vagrant configuration is done here. The most common configuration
+  # options are documented and commented below. For a complete reference,
+  # please see the online documentation at vagrantup.com.
+
+  # Every Vagrant virtual environment requires a box to build off of.
+  #config.vm.box = "precise32"
+
+  # The url from where the 'config.vm.box' box will be fetched if it
+  # doesn't already exist on the user's system.
+  #config.vm.box_url = "http://files.vagrantup.com/precise32.box";
+
+  # Create a forwarded port mapping which allows access to a specific port
+  # within the machine from a port on the host machine. In the example below,
+  # accessing "localhost:8080" will access port 80 on the guest machine.
+  # config.vm.network :forwarded_port, guest: 8080, host: 8080
+
+  # Create a private network, which allows host-only access to the machine
+  # using a specific IP.
+  config.vm.network :private_network, ip: "192.168.100.100"
+
+  # Create a public network, which generally matched to bridged network.
+  # Bridged networks make the machine appear as another physical device on
+  # your network.
+  # config.vm.network :public_network
+
+  # If true, then any SSH connections made will enable agent forwarding.
+  # Default value: false
+  # config.ssh.forward_agent = true
+
+  # Share an additional folder to the guest VM. The first argument is
+  # the path on the host to the actual folder. The second argument is
+  # the path on the guest to mount the folder. And the optional third
+  # argument is a set of non-required options.
+  # config.vm.synced_folder "../data", "/vagrant_data"
+
+  # Provider-specific configuration so you can fine-tune various
+  # backing providers for Vagrant. These expose provider-specific options.
+  # Example for VirtualBox:
+  #
+   config.vm.provider :virtualbox do |vb|
+  #   # Don't boot with headless mode
+     vb.gui = false
+  #
+  #   # Use VBoxManage to customize the VM. For example to change memory:
+     vb.customize ["modifyvm", :id, "--memory", "3072"]
+   end
+  #
+  # View the documentation for the provider you're using for more
+  # information on available options.
+
+  # Enable provisioning with Puppet stand alone.  Puppet manifests
+  # are contained in a directory path relative to this Vagrantfile.
+  # You will need to create the manifests directory and a manifest in
+  # the file precise32.pp in the manifests_path directory.
+  #
+  # An example Puppet manifest to provision the message of the day:
+  #
+  # # group { "puppet":
+  # #   ensure => "present",
+  # # }
+  # #
+  # # File { owner => 0, group => 0, mode => 0644 }
+  # #
+  # # file { '/etc/motd':
+  # #   content => "Welcome to your Vagrant-built virtual machine!
+  # #               Managed by Puppet.\n"
+  # # }
+  #
+  # config.vm.provision :puppet do |puppet|
+  #   puppet.manifests_path = "manifests"
+  #   puppet.manifest_file  = "site.pp"
+  # end
+
+  # Enable provisioning with chef solo, specifying a cookbooks path, roles
+  # path, and data_bags path (all relative to this Vagrantfile), and adding
+  # some recipes and/or roles.
+  #
+  # config.vm.provision :chef_solo do |chef|
+  #   chef.cookbooks_path = "../my-recipes/cookbooks"
+  #   chef.roles_path = "../my-recipes/roles"
+  #   chef.data_bags_path = "../my-recipes/data_bags"
+  #   chef.add_recipe "mysql"
+  #   chef.add_role "web"
+  #
+  #   # You may also specify custom JSON attributes:
+  #   chef.json = { :mysql_password => "foo" }
+  # end
+
+  # Enable provisioning with chef server, specifying the chef server URL,
+  # and the path to the validation key (relative to this Vagrantfile).
+  #
+  # The Opscode Platform uses HTTPS. Substitute your organization for
+  # ORGNAME in the URL and validation key.
+  #
+  # If you have your own Chef Server, use the appropriate URL, which may be
+  # HTTP instead of HTTPS depending on your configuration. Also change the
+  # validation key to validation.pem.
+  #
+  # config.vm.provision :chef_client do |chef|
+  #   chef.chef_server_url = "https://api.opscode.com/organizations/ORGNAME";
+  #   chef.validation_key_path = "ORGNAME-validator.pem"
+  # end
+  #
+  # If you're using the Opscode platform, your validator client is
+  # ORGNAME-validator, replacing ORGNAME with your organization name.
+  #
+  # If you have your own Chef Server, the default validation client name is
+  # chef-validator, unless you changed the configuration.
+  #
+  #   chef.validation_client_name = "ORGNAME-validator"
+end

http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/config/cluster.xml
----------------------------------------------------------------------
diff --git a/integration-test/config/cluster.xml 
b/integration-test/config/cluster.xml
new file mode 100644
index 0000000..d37e6e6
--- /dev/null
+++ b/integration-test/config/cluster.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration scan="true" scanPeriod="60 seconds">
+ <appender name="A1" class="ch.qos.logback.core.rolling.RollingFileAppender">
+    <file>/var/log/storm/${logfile.name}</file>
+    <rollingPolicy 
class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+      <fileNamePattern>/var/log/storm/${logfile.name}.%i</fileNamePattern>
+      <minIndex>1</minIndex>
+      <maxIndex>9</maxIndex>
+    </rollingPolicy>
+
+    <triggeringPolicy 
class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+      <maxFileSize>100MB</maxFileSize>
+    </triggeringPolicy>
+
+    <encoder>
+      <pattern>%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n</pattern>
+    </encoder>
+ </appender>
+
+ <appender name="ACCESS" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
+    <file>/var/log/storm/access.log</file>
+    <rollingPolicy 
class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+      <fileNamePattern>/var/log/storm/access.log.%i</fileNamePattern>
+      <minIndex>1</minIndex>
+      <maxIndex>9</maxIndex>
+    </rollingPolicy>
+
+    <triggeringPolicy 
class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+      <maxFileSize>100MB</maxFileSize>
+    </triggeringPolicy>
+
+    <encoder>
+      <pattern>%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n</pattern>
+    </encoder>
+  </appender>
+
+  <appender name="METRICS" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
+    <file>/var/log/storm/metrics.log</file>
+    <rollingPolicy 
class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+      <fileNamePattern>metrics.log.%i</fileNamePattern>
+      <minIndex>1</minIndex>
+      <maxIndex>9</maxIndex>
+    </rollingPolicy>
+
+    <triggeringPolicy 
class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+      <maxFileSize>2MB</maxFileSize>
+    </triggeringPolicy>
+
+    <encoder>
+      <pattern>%d %-8r %m%n</pattern>
+    </encoder>
+  </appender>
+
+  <root level="INFO">
+    <appender-ref ref="A1"/>
+  </root>
+  
+  <logger name="backtype.storm.messaging.netty">
+    <level value="WARN" />
+    <appender-ref ref="A1" />
+  </logger>
+  
+  <logger name="backtype.storm">
+    <level value="DEBUG" />
+    <appender-ref ref="A1" />
+  </logger>
+
+  <logger name="backtype.storm.security.auth.authorizer" additivity="false">
+    <level value="INFO" />
+    <appender-ref ref="ACCESS" />
+  </logger>
+
+  <logger name="backtype.storm.metric.LoggingMetricsConsumer" 
additivity="false" >
+    <level value="INFO"/>
+    <appender-ref ref="METRICS"/>
+  </logger>
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/config/common.sh
----------------------------------------------------------------------
diff --git a/integration-test/config/common.sh 
b/integration-test/config/common.sh
new file mode 100644
index 0000000..c89319c
--- /dev/null
+++ b/integration-test/config/common.sh
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+USER_SCRIPT="user-script.sh"
+[[ -f $USER_SCRIPT ]] && echo "Running ${USER_SCRIPT}" && bash ${USER_SCRIPT} 
|| echo "${USER_SCRIPT} not found/executed, continuing."
+#apt-get update
+#apt-get --yes remove openjdk-6-jre-headless
+#apt-get --yes install openjdk-7-jdk

http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/config/etc-hosts
----------------------------------------------------------------------
diff --git a/integration-test/config/etc-hosts 
b/integration-test/config/etc-hosts
new file mode 100644
index 0000000..45e2aa6
--- /dev/null
+++ b/integration-test/config/etc-hosts
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+127.0.0.1 localhost localhost
+192.168.100.100 node-1

http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/config/install-storm.sh
----------------------------------------------------------------------
diff --git a/integration-test/config/install-storm.sh 
b/integration-test/config/install-storm.sh
new file mode 100644
index 0000000..2731abe
--- /dev/null
+++ b/integration-test/config/install-storm.sh
@@ -0,0 +1,39 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# $1 is the storm binary zip file
+SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+
+groupadd storm
+useradd --gid storm --home-dir /home/storm --create-home --shell /bin/bash 
storm
+
+unzip -o "$1" -d /usr/share/
+chown -R storm:storm /usr/share/apache-storm*
+ln -s /usr/share/apache-storm* /usr/share/storm
+ln -s /usr/share/storm/bin/storm /usr/bin/storm
+
+mkdir /etc/storm
+chown storm:storm /etc/storm
+
+rm /usr/share/storm/conf/storm.yaml
+cp "${SCRIPT_DIR}/storm.yaml" /usr/share/storm/conf/
+cp "${SCRIPT_DIR}/cluster.xml" /usr/share/storm/logback/
+ln -s /usr/share/storm/conf/storm.yaml /etc/storm/storm.yaml
+
+mkdir /var/log/storm
+chown storm:storm /var/log/storm
+
+#sed -i 's/${storm.home}\/logs/\/var\/log\/storm/g' 
/usr/share/storm/logback/cluster.xml

http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/config/install-zookeeper.sh
----------------------------------------------------------------------
diff --git a/integration-test/config/install-zookeeper.sh 
b/integration-test/config/install-zookeeper.sh
new file mode 100644
index 0000000..a81a07c
--- /dev/null
+++ b/integration-test/config/install-zookeeper.sh
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+apt-get --yes install zookeeper=3.3.5* zookeeperd=3.3.5*
+service zookeeper stop
+echo maxClientCnxns=200 >> /etc/zookeeper/conf/zoo.cfg
+service zookeeper start

http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/config/storm.yaml
----------------------------------------------------------------------
diff --git a/integration-test/config/storm.yaml 
b/integration-test/config/storm.yaml
new file mode 100644
index 0000000..eca352f
--- /dev/null
+++ b/integration-test/config/storm.yaml
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+storm.zookeeper.servers:
+    - "node1"
+
+nimbus.seeds: ["node1"]
+
+# netty transport
+storm.messaging.transport: "org.apache.storm.messaging.netty.Context"
+storm.messaging.netty.buffer_size: 16384
+storm.messaging.netty.max_retries: 10
+storm.messaging.netty.min_wait_ms: 1000
+storm.messaging.netty.max_wait_ms: 5000
+
+drpc.servers:
+  - "node1"
+
+supervisor.slots.ports: [6700, 6701, 6702, 6703, 6704, 6705, 6706, 6707, 6708, 
6709]

http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/pom.xml
----------------------------------------------------------------------
diff --git a/integration-test/pom.xml b/integration-test/pom.xml
new file mode 100755
index 0000000..47eb244
--- /dev/null
+++ b/integration-test/pom.xml
@@ -0,0 +1,250 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <name>storm-integration-test</name>
+    <groupId>org.apache.storm</groupId>
+    <artifactId>storm-integration-test</artifactId>
+    <version>0.3</version>
+    <packaging>jar</packaging>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <!-- see comment below... This fixes an annoyance with intellij -->
+        <provided.scope>provided</provided.scope>
+        <storm.version>1.0.1.2.5.0.0-781</storm.version>
+        <redirectTestOutputToFile>true</redirectTestOutputToFile>
+        <regression.downloadWorkerLogs>false</regression.downloadWorkerLogs>
+        <storm.conf.dir>/etc/storm/conf</storm.conf.dir>
+        <hadoop.conf.dir>/etc/hadoop/conf</hadoop.conf.dir>
+    </properties>
+
+    <profiles>
+        <profile>
+            <id>intellij</id>
+            <properties>
+                <provided.scope>compile</provided.scope>
+                
<regression.downloadWorkerLogs>true</regression.downloadWorkerLogs>
+                <storm.conf.dir>src/test/resources/storm-conf/</storm.conf.dir>
+                
<hadoop.conf.dir>src/main/resources/hadoop-conf/</hadoop.conf.dir>
+            </properties>
+        </profile>
+    </profiles>
+
+    <repositories>
+        <repository>
+            <releases>
+                <enabled>true</enabled>
+            </releases>
+            <snapshots>
+                <enabled>false</enabled>
+            </snapshots>
+            <id>central</id>
+            <url>http://repo1.maven.org/maven2/</url>
+        </repository>
+        <repository>
+            <releases>
+                <enabled>true</enabled>
+            </releases>
+            <snapshots>
+                <enabled>true</enabled>
+            </snapshots>
+            <id>clojars</id>
+            <url>https://clojars.org/repo/</url>
+        </repository>
+        <repository>
+            <id>hortonworks</id>
+            
<url>http://nexus-private.hortonworks.com/nexus/content/groups/public/</url>
+        </repository>
+    </repositories>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+            <version>6.8.5</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>joda-time</groupId>
+            <artifactId>joda-time</artifactId>
+            <version>2.3</version>
+        </dependency>
+        <dependency>
+            <groupId>org.seleniumhq.selenium</groupId>
+            <artifactId>selenium-firefox-driver</artifactId>
+            <version>2.45.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.seleniumhq.selenium</groupId>
+            <artifactId>selenium-support</artifactId>
+            <version>2.45.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${storm.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-solr</artifactId>
+            <version>${storm.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-starter</artifactId>
+            <version>${storm.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-hdfs</artifactId>
+            <version>${storm.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <pluginManagement>
+            <plugins>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-compiler-plugin</artifactId>
+                    <version>2.3.2</version>
+                </plugin>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-resources-plugin</artifactId>
+                    <version>2.6</version>
+                </plugin>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-surefire-plugin</artifactId>
+                    <version>2.18.1</version>
+                </plugin>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-shade-plugin</artifactId>
+                    <version>2.4.1</version>
+                </plugin>
+            </plugins>
+        </pluginManagement>
+
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>1.7</source>
+                    <target>1.7</target>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>2.6</version>
+                <configuration>
+                    <encoding>UTF-8</encoding>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    
<redirectTestOutputToFile>${redirectTestOutputToFile}</redirectTestOutputToFile>
+                    <forkCount>1C</forkCount>
+                    <argLine>-Xmx1024m</argLine>
+                    <properties>
+                        <property>
+                            <name>listener</name>
+                            
<value>org.apache.storm.st.meta.TestngListener</value>
+                        </property>
+                    </properties>
+                    <systemPropertyVariables>
+                        
<regression.downloadWorkerLogs>${regression.downloadWorkerLogs}</regression.downloadWorkerLogs>
+                    </systemPropertyVariables>
+                    <additionalClasspathElements>
+                        <!-- Hack to get the dir in CP through CLI and idea -->
+                        
<additionalClasspathElement>${storm.conf.dir}</additionalClasspathElement>
+                        
<additionalClasspathElement>${hadoop.conf.dir}</additionalClasspathElement>
+                        
<additionalClasspathElement>${extra.classpath.1}</additionalClasspathElement>
+                        
<additionalClasspathElement>${extra.classpath.2}</additionalClasspathElement>
+                        
<additionalClasspathElement>${extra.classpath.3}</additionalClasspathElement>
+                        
<additionalClasspathElement>${extra.classpath.4}</additionalClasspathElement>
+                        
<additionalClasspathElement>${extra.classpath.5}</additionalClasspathElement>
+                        
<additionalClasspathElement>${extra.classpath.6}</additionalClasspathElement>
+                    </additionalClasspathElements>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <configuration>
+                    
<createDependencyReducedPom>true</createDependencyReducedPom>
+                    <filters>
+                        <filter>
+                            <artifact>*:*</artifact>
+                            <excludes>
+                                <exclude>META-INF/*.SF</exclude>
+                                <exclude>META-INF/*.sf</exclude>
+                                <exclude>META-INF/*.DSA</exclude>
+                                <exclude>META-INF/*.dsa</exclude>
+                                <exclude>META-INF/*.RSA</exclude>
+                                <exclude>META-INF/*.rsa</exclude>
+                                <exclude>META-INF/*.EC</exclude>
+                                <exclude>META-INF/*.ec</exclude>
+                                <exclude>META-INF/MSFTSIG.SF</exclude>
+                                <exclude>META-INF/MSFTSIG.RSA</exclude>
+                            </excludes>
+                        </filter>
+                    </filters>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <transformers>
+                                <transformer
+                                        
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                                <transformer
+                                        
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+        <resources>
+            <resource>
+                <directory>${hadoop.conf.dir}</directory>
+            </resource>
+        </resources>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/run-it.sh
----------------------------------------------------------------------
diff --git a/integration-test/run-it.sh b/integration-test/run-it.sh
new file mode 100755
index 0000000..3b4a861
--- /dev/null
+++ b/integration-test/run-it.sh
@@ -0,0 +1,83 @@
+#!/usr/bin/env bash
+# -*- compile-command: "cd config/ && vagrant destroy -f; vagrant up" -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+set -x
+set -e
+SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+echo SCRIPT_DIR="${SCRIPT_DIR}"
+STORM_SRC_DIR=$(dirname ${SCRIPT_DIR})
+echo SCRIPT_SRC_DIR="${SCRIPT_SRC_DIR}"
+function die() {
+  echo $*
+  exit 1
+}
+function list_storm_processes() {
+    (ps -ef | grep -i -e zookeeper | grep -v grep) && (ps -ef | grep -i -e 
storm.home  | grep -v grep)
+}
+
+list_storm_processes || true
+# increasing swap space so we can run lots of workers
+sudo dd if=/dev/zero of=/swapfile.img bs=8192 count=1M
+sudo mkswap /swapfile.img
+sudo swapon /swapfile.img
+
+if [[ "${USER}" == "vagrant" ]]; then # install oracle jdk8
+    sudo apt-get update
+    sudo apt-get -y install python-software-properties
+    sudo apt-add-repository -y ppa:webupd8team/java
+    sudo apt-get update
+    echo "oracle-java8-installer shared/accepted-oracle-license-v1-1 select 
true" | sudo debconf-set-selections
+    sudo apt-get install -y oracle-java8-installer
+    sudo apt-get -y install maven
+    java -version
+    mvn --version
+    export MAVEN_OPTS="-Xmx3000m"
+else
+    ( while true; do echo "heartbeat"; sleep 300; done ) & #heartbeat needed 
by travis ci
+    (cd ${STORM_SRC_DIR} && mvn clean install -DskipTests=true) || die "maven 
install command failed"
+    (cd ${STORM_SRC_DIR}/storm-dist/binary && mvn package -Dgpg.skip=true) || 
die "maven package command failed"
+fi
+storm_binary_zip=$(find ${STORM_SRC_DIR}/storm-dist -iname '*.zip')
+storm_binary_name=$(basename ${storm_binary_zip})
+export STORM_VERSION=$(grep -oPe '\d.*(?=.zip)' <<<${storm_binary_name})
+echo "Using storm version:" ${STORM_VERSION}
+
+# setup storm cluster
+list_storm_processes || true
+sudo bash ${SCRIPT_DIR}/config/common.sh
+sudo bash ${SCRIPT_DIR}/config/install-zookeeper.sh
+sudo bash ${SCRIPT_DIR}/config/install-storm.sh $storm_binary_zip
+export JAVA_HOME
+env
+function start_storm_process() {
+    echo starting: storm $1
+    sudo su storm -c "export JAVA_HOME=${JAVA_HOME} && cd /home/storm && storm 
$1" &
+}
+start_storm_process nimbus
+start_storm_process ui
+start_storm_process supervisor
+start_storm_process logviewer
+#start_storm_process drpc
+pushd "${SCRIPT_DIR}"
+mvn clean package  -DskipTests -Dstorm.version=${STORM_VERSION}
+for i in {1..20} ; do
+    list_storm_processes && break
+    sleep 6
+done
+list_storm_processes
+mvn test -DfailIfNoTests=false -Dstorm.version=${STORM_VERSION} 
-Dui.url=http://localhost:8744

http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/src/main/java/org/apache/storm/ExclamationTopology.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/main/java/org/apache/storm/ExclamationTopology.java 
b/integration-test/src/main/java/org/apache/storm/ExclamationTopology.java
new file mode 100644
index 0000000..d464608
--- /dev/null
+++ b/integration-test/src/main/java/org/apache/storm/ExclamationTopology.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm;
+
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+import java.util.Map;
+
+/**
+ * This is a basic example of a Storm topology.
+ */
+public class ExclamationTopology {
+
+  public static final String WORD = "word";
+  public static final String EXCLAIM_1 = "exclaim1";
+  public static final String EXCLAIM_2 = "exclaim2";
+
+  public static class ExclamationBolt extends BaseRichBolt {
+    OutputCollector _collector;
+
+    @Override
+    public void prepare(Map conf, TopologyContext context, OutputCollector 
collector) {
+      _collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
+      _collector.ack(tuple);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("word"));
+    }
+
+
+  }
+
+  public static void main(String[] args) throws Exception {
+    StormTopology topology = getStormTopology();
+
+    Config conf = new Config();
+    conf.setDebug(true);
+
+    if (args != null && args.length > 0) {
+      conf.setNumWorkers(3);
+
+      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, topology);
+    }
+    else {
+
+      LocalCluster cluster = new LocalCluster();
+      cluster.submitTopology("test", conf, topology);
+      Utils.sleep(10000);
+      cluster.killTopology("test");
+      cluster.shutdown();
+    }
+  }
+
+  public static StormTopology getStormTopology() {
+    TopologyBuilder builder = new TopologyBuilder();
+    builder.setSpout(WORD, new TestWordSpout(), 10);
+    builder.setBolt(EXCLAIM_1, new ExclamationTopology.ExclamationBolt(), 
3).shuffleGrouping(WORD);
+    builder.setBolt(EXCLAIM_2, new ExclamationTopology.ExclamationBolt(), 
2).shuffleGrouping(EXCLAIM_1);
+    return builder.createTopology();
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/src/main/java/org/apache/storm/debug/DebugHelper.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/main/java/org/apache/storm/debug/DebugHelper.java 
b/integration-test/src/main/java/org/apache/storm/debug/DebugHelper.java
new file mode 100644
index 0000000..97c0554
--- /dev/null
+++ b/integration-test/src/main/java/org/apache/storm/debug/DebugHelper.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.debug;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+
+public class DebugHelper {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DebugHelper.class);
+
+    public static void printClassPath() {
+        URL[] urls = getClassPaths();
+        LOG.info("classpath:" + StringUtils.join(urls, ':'));
+    }
+
+    public static URL[] getClassPaths() {
+        ClassLoader cl = ClassLoader.getSystemClassLoader();
+        return ((URLClassLoader)cl).getURLs();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/src/main/java/org/apache/storm/st/topology/TestableTopology.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/main/java/org/apache/storm/st/topology/TestableTopology.java
 
b/integration-test/src/main/java/org/apache/storm/st/topology/TestableTopology.java
new file mode 100644
index 0000000..3823310
--- /dev/null
+++ 
b/integration-test/src/main/java/org/apache/storm/st/topology/TestableTopology.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.st.topology;
+
+import org.apache.storm.generated.StormTopology;
+
+import java.util.List;
+
+public interface TestableTopology {
+    String DUMMY_FIELD = "dummy";
+    List<String> getExpectedOutput();
+    StormTopology newTopology();
+    String getBoltName();
+    String getSpoutName();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java
 
b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java
new file mode 100644
index 0000000..430449b
--- /dev/null
+++ 
b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.st.topology.window;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.st.topology.TestableTopology;
+import org.apache.storm.st.topology.window.data.TimeData;
+import org.apache.storm.st.utils.StringDecorator;
+import org.apache.storm.st.utils.TimeUtil;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.windowing.TupleWindow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Computes sliding window sum
+ */
+public class SlidingTimeCorrectness implements TestableTopology {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SlidingTimeCorrectness.class);
+    private final int windowSec;
+    private final int slideSec;
+    private final String spoutName;
+    private final String boltName;
+
+    public SlidingTimeCorrectness(int windowSec, int slideSec) {
+        this.windowSec = windowSec;
+        this.slideSec = slideSec;
+        final String prefix = this.getClass().getSimpleName() + "-winSec" + 
windowSec + "slideSec" + slideSec;
+        spoutName = prefix + "IncrementingSpout";
+        boltName = prefix + "VerificationBolt";
+    }
+
+    public String getBoltName() {
+        return boltName;
+    }
+
+    public String getSpoutName() {
+        return spoutName;
+    }
+
+    public StormTopology newTopology() {
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout(getSpoutName(), new IncrementingSpout(), 2);
+        builder.setBolt(getBoltName(),
+                new VerificationBolt()
+                        .withWindow(new BaseWindowedBolt.Duration(windowSec, 
TimeUnit.SECONDS),
+                                new BaseWindowedBolt.Duration(slideSec, 
TimeUnit.SECONDS))
+                        .withTimestampField(TimeData.getTimestampFieldName())
+                        .withLag(new BaseWindowedBolt.Duration(10, 
TimeUnit.SECONDS)),
+                1)
+                .globalGrouping(getSpoutName());
+        return builder.createTopology();
+    }
+
+    public List<String> getExpectedOutput() {
+        return Lists.newArrayList(
+                StringDecorator.decorate(getBoltName(), "tuplesInWindow.size() 
= " + windowSec),
+                StringDecorator.decorate(getBoltName(), "newTuples.size() = " 
+ slideSec),
+                StringDecorator.decorate(getBoltName(), "expiredTuples.size() 
= " + slideSec)
+        );
+    }
+
+    public static class IncrementingSpout extends BaseRichSpout {
+        private static final Logger LOG = 
LoggerFactory.getLogger(IncrementingSpout.class);
+        private SpoutOutputCollector collector;
+        private static int currentNum;
+        private static Random rng = new Random();
+        private String componentId;
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(TimeData.getFields());
+        }
+
+        @Override
+        public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
+            componentId = context.getThisComponentId();
+            this.collector = collector;
+        }
+
+        @Override
+        public void nextTuple() {
+            TimeUtil.sleepMilliSec(rng.nextInt(800));
+            currentNum++;
+            TimeData data = TimeData.newData(currentNum);
+            final Values tuple = data.getValues();
+            collector.emit(tuple);
+            LOG.info(StringDecorator.decorate(componentId, data.toString()));
+        }
+
+        @Override
+        public void ack(Object msgId) {
+            LOG.info("Received ACK for msgId : " + msgId);
+        }
+
+        @Override
+        public void fail(Object msgId) {
+            LOG.info("Received FAIL for msgId : " + msgId);
+        }
+    }
+
+    public static class VerificationBolt extends BaseWindowedBolt {
+        private OutputCollector collector;
+        private String componentId;
+
+        @Override
+        public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+            componentId = context.getThisComponentId();
+            this.collector = collector;
+        }
+
+        @Override
+        public void execute(TupleWindow inputWindow) {
+            List<Tuple> tuplesInWindow = inputWindow.get();
+            List<Tuple> newTuples = inputWindow.getNew();
+            List<Tuple> expiredTuples = inputWindow.getExpired();
+            LOG.info("tuplesInWindow.size() = " + tuplesInWindow.size());
+            LOG.info("newTuples.size() = " + newTuples.size());
+            LOG.info("expiredTuples.size() = " + expiredTuples.size());
+            Collection<TimeData> dataInWindow = 
Collections2.transform(tuplesInWindow, new Function<Tuple, TimeData>() {
+                @Nullable
+                @Override
+                public TimeData apply(@Nullable Tuple input) {
+                    return TimeData.fromTuple(input);
+                }
+            });
+            final String jsonData = TimeData.toString(dataInWindow);
+            LOG.info(StringDecorator.decorate(componentId, jsonData));
+            collector.emit(new Values("dummyValue"));
+        }
+
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields(DUMMY_FIELD));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java
 
b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java
new file mode 100644
index 0000000..33ee004
--- /dev/null
+++ 
b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.st.topology.window;
+
+import com.google.common.collect.Lists;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.windowing.TupleWindow;
+import org.apache.storm.st.topology.TestableTopology;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.st.utils.StringDecorator;
+import org.apache.storm.st.utils.TimeUtil;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Computes sliding window sum
+ */
+public class SlidingWindowCorrectness implements TestableTopology {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SlidingWindowCorrectness.class);
+    private static final String NUMBER_FIELD = "number";
+    private static final String STRING_FIELD = "numAsStr";
+    private final int windowSize;
+    private final int slideSize;
+    private final String spoutName;
+    private final String boltName;
+
+    public SlidingWindowCorrectness(int windowSize, int slideSize) {
+        this.windowSize = windowSize;
+        this.slideSize = slideSize;
+        final String prefix = this.getClass().getSimpleName() + "-winSize" + 
windowSize + "slideSize" + slideSize;
+        spoutName = prefix + "IncrementingSpout";
+        boltName = prefix + "VerificationBolt";
+    }
+
+    public String getBoltName() {
+        return boltName;
+    }
+
+    public String getSpoutName() {
+        return spoutName;
+    }
+
+    public StormTopology newTopology() {
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout(getSpoutName(), new IncrementingSpout(), 1);
+        builder.setBolt(getBoltName(),
+                new VerificationBolt()
+                        .withLag(new BaseWindowedBolt.Duration(10, 
TimeUnit.SECONDS))
+                        .withWindow(new BaseWindowedBolt.Count(windowSize), 
new BaseWindowedBolt.Count(slideSize)),
+                1)
+                .shuffleGrouping(getSpoutName());
+        return builder.createTopology();
+    }
+
+    public List<String> getExpectedOutput() {
+        return Lists.newArrayList(
+                StringDecorator.decorate(getBoltName(), "tuplesInWindow.size() 
= " + windowSize),
+                StringDecorator.decorate(getBoltName(), "newTuples.size() = " 
+ slideSize),
+                StringDecorator.decorate(getBoltName(), "expiredTuples.size() 
= " + slideSize)
+        );
+    }
+
+    public static class IncrementingSpout extends BaseRichSpout {
+        private static final Logger LOG = 
LoggerFactory.getLogger(IncrementingSpout.class);
+        private SpoutOutputCollector collector;
+        private static int currentNum;
+        private static Random rng = new Random();
+        private String componentId;
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields(NUMBER_FIELD, STRING_FIELD));
+        }
+
+        @Override
+        public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
+            componentId = context.getThisComponentId();
+            this.collector = collector;
+        }
+
+        @Override
+        public void nextTuple() {
+            TimeUtil.sleepMilliSec(rng.nextInt(10));
+            currentNum++;
+            final String numAsStr = "str(" + currentNum + ")str";
+            final Values tuple = new Values(currentNum, numAsStr);
+            LOG.info(StringDecorator.decorate(componentId, tuple.toString()));
+            collector.emit(tuple, currentNum);
+        }
+
+        @Override
+        public void ack(Object msgId) {
+            LOG.info("Received ACK for msgId : " + msgId);
+        }
+
+        @Override
+        public void fail(Object msgId) {
+            LOG.info("Received FAIL for msgId : " + msgId);
+        }
+    }
+
+    public static class VerificationBolt extends BaseWindowedBolt {
+        private OutputCollector collector;
+        private String componentId;
+
+        @Override
+        public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+            componentId = context.getThisComponentId();
+            this.collector = collector;
+        }
+
+        @Override
+        public void execute(TupleWindow inputWindow) {
+            List<Tuple> tuplesInWindow = inputWindow.get();
+            List<Tuple> newTuples = inputWindow.getNew();
+            List<Tuple> expiredTuples = inputWindow.getExpired();
+            LOG.info("tuplesInWindow.size() = " + tuplesInWindow.size());
+            LOG.info("newTuples.size() = " + newTuples.size());
+            LOG.info("expiredTuples.size() = " + expiredTuples.size());
+            LOG.info(StringDecorator.decorate(componentId, "tuplesInWindow = " 
+ tuplesInWindow.toString()));
+            collector.emit(new Values("dummyValue"));
+        }
+
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields(DUMMY_FIELD));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java
 
b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java
new file mode 100644
index 0000000..a77836f
--- /dev/null
+++ 
b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.st.topology.window;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.st.topology.TestableTopology;
+import org.apache.storm.st.topology.window.data.TimeData;
+import org.apache.storm.st.utils.TimeUtil;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.windowing.TupleWindow;
+import org.apache.storm.st.utils.StringDecorator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Computes sliding window sum
+ */
+public class TumblingTimeCorrectness implements TestableTopology {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TumblingTimeCorrectness.class);
+    private final int tumbleSec;
+    private final String spoutName;
+    private final String boltName;
+
+    public TumblingTimeCorrectness(int timbleSec) {
+        this.tumbleSec = timbleSec;
+        final String prefix = this.getClass().getSimpleName() + "-timbleSec" + 
timbleSec;
+        spoutName = prefix + "IncrementingSpout";
+        boltName = prefix + "VerificationBolt";
+    }
+
+    public String getBoltName() {
+        return boltName;
+    }
+
+    public String getSpoutName() {
+        return spoutName;
+    }
+
+    public StormTopology newTopology() {
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout(getSpoutName(), new IncrementingSpout(), 2);
+        builder.setBolt(getBoltName(),
+                new VerificationBolt()
+                        .withTumblingWindow(new 
BaseWindowedBolt.Duration(tumbleSec, TimeUnit.SECONDS))
+                        .withLag(new BaseWindowedBolt.Duration(10, 
TimeUnit.SECONDS))
+                        .withTimestampField(TimeData.getTimestampFieldName()),
+                1)
+                .globalGrouping(getSpoutName());
+        return builder.createTopology();
+    }
+
+    public List<String> getExpectedOutput() {
+        return Lists.newArrayList(
+                StringDecorator.decorate(getBoltName(), "tuplesInWindow.size() 
= " + tumbleSec),
+                StringDecorator.decorate(getBoltName(), "newTuples.size() = " 
+ tumbleSec),
+                StringDecorator.decorate(getBoltName(), "expiredTuples.size() 
= " + tumbleSec)
+        );
+    }
+
+    public static class IncrementingSpout extends BaseRichSpout {
+        private static final Logger LOG = 
LoggerFactory.getLogger(IncrementingSpout.class);
+        private SpoutOutputCollector collector;
+        private static int currentNum;
+        private static Random rng = new Random();
+        private String componentId;
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(TimeData.getFields());
+        }
+
+        @Override
+        public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
+            componentId = context.getThisComponentId();
+            this.collector = collector;
+        }
+
+        @Override
+        public void nextTuple() {
+            TimeUtil.sleepMilliSec(rng.nextInt(800));
+            currentNum++;
+            TimeData data = TimeData.newData(currentNum);
+            final Values tuple = data.getValues();
+            collector.emit(tuple);
+            LOG.info(StringDecorator.decorate(componentId, data.toString()));
+        }
+
+        @Override
+        public void ack(Object msgId) {
+            LOG.info("Received ACK for msgId : " + msgId);
+        }
+
+        @Override
+        public void fail(Object msgId) {
+            LOG.info("Received FAIL for msgId : " + msgId);
+        }
+    }
+
+    public static class VerificationBolt extends BaseWindowedBolt {
+        private OutputCollector collector;
+        private String componentId;
+
+        @Override
+        public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+            componentId = context.getThisComponentId();
+            this.collector = collector;
+        }
+
+        @Override
+        public void execute(TupleWindow inputWindow) {
+            List<Tuple> tuplesInWindow = inputWindow.get();
+            List<Tuple> newTuples = inputWindow.getNew();
+            List<Tuple> expiredTuples = inputWindow.getExpired();
+            LOG.info("tuplesInWindow.size() = " + tuplesInWindow.size());
+            LOG.info("newTuples.size() = " + newTuples.size());
+            LOG.info("expiredTuples.size() = " + expiredTuples.size());
+            Collection<TimeData> dataInWindow = 
Collections2.transform(tuplesInWindow, new Function<Tuple, TimeData>() {
+                @Nullable
+                @Override
+                public TimeData apply(@Nullable Tuple input) {
+                    return TimeData.fromTuple(input);
+                }
+            });
+            final String jsonData = TimeData.toString(dataInWindow);
+            LOG.info(StringDecorator.decorate(componentId, jsonData));
+            collector.emit(new Values("dummyValue"));
+        }
+
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields(DUMMY_FIELD));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java
 
b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java
new file mode 100644
index 0000000..22c6d75
--- /dev/null
+++ 
b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.st.topology.window;
+
+import com.google.common.collect.Lists;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.windowing.TupleWindow;
+import org.apache.storm.st.topology.TestableTopology;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.st.utils.StringDecorator;
+import org.apache.storm.st.utils.TimeUtil;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Computes sliding window sum
+ */
+public class TumblingWindowCorrectness implements TestableTopology {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TumblingWindowCorrectness.class);
+    private static final String NUMBER_FIELD = "number";
+    private static final String STRING_FIELD = "numAsStr";
+    private final int tumbleSize;
+    private final String spoutName;
+    private final String boltName;
+
+    public TumblingWindowCorrectness(final int tumbleSize) {
+        this.tumbleSize = tumbleSize;
+        final String prefix = this.getClass().getSimpleName() + "-tubleSize" + 
tumbleSize;
+        spoutName = prefix + "IncrementingSpout";
+        boltName = prefix + "VerificationBolt";
+    }
+
+    public String getBoltName() {
+        return boltName;
+    }
+
+    public String getSpoutName() {
+        return spoutName;
+    }
+
+    public StormTopology newTopology() {
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout(getSpoutName(), new IncrementingSpout(), 1);
+        builder.setBolt(getBoltName(),
+                new VerificationBolt()
+                        .withLag(new BaseWindowedBolt.Duration(10, 
TimeUnit.SECONDS))
+                        .withTumblingWindow(new 
BaseWindowedBolt.Count(tumbleSize)), 1)
+                .shuffleGrouping(getSpoutName());
+        return builder.createTopology();
+    }
+
+    public List<String> getExpectedOutput() {
+        return Lists.newArrayList(
+                StringDecorator.decorate(getBoltName(), "tuplesInWindow.size() 
= " + tumbleSize),
+                StringDecorator.decorate(getBoltName(), "newTuples.size() = " 
+ tumbleSize),
+                StringDecorator.decorate(getBoltName(), "expiredTuples.size() 
= " + tumbleSize)
+        );
+    }
+
+    public static class IncrementingSpout extends BaseRichSpout {
+        private static final Logger LOG = 
LoggerFactory.getLogger(IncrementingSpout.class);
+        private SpoutOutputCollector collector;
+        private static int currentNum;
+        private static Random rng = new Random();
+        private String componentId;
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields(NUMBER_FIELD, STRING_FIELD));
+        }
+
+        @Override
+        public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
+            componentId = context.getThisComponentId();
+            this.collector = collector;
+        }
+
+        @Override
+        public void nextTuple() {
+            TimeUtil.sleepMilliSec(rng.nextInt(10));
+            currentNum++;
+            final String numAsStr = "str(" + currentNum + ")str";
+            final Values tuple = new Values(currentNum, numAsStr);
+            LOG.info(StringDecorator.decorate(componentId, tuple.toString()));
+            collector.emit(tuple, currentNum);
+        }
+
+        @Override
+        public void ack(Object msgId) {
+            LOG.info("Received ACK for msgId : " + msgId);
+        }
+
+        @Override
+        public void fail(Object msgId) {
+            LOG.info("Received FAIL for msgId : " + msgId);
+        }
+    }
+
+    public static class VerificationBolt extends BaseWindowedBolt {
+        private OutputCollector collector;
+        private String componentId;
+
+        @Override
+        public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+            componentId = context.getThisComponentId();
+            this.collector = collector;
+        }
+
+        @Override
+        public void execute(TupleWindow inputWindow) {
+            List<Tuple> tuplesInWindow = inputWindow.get();
+            List<Tuple> newTuples = inputWindow.getNew();
+            List<Tuple> expiredTuples = inputWindow.getExpired();
+            LOG.info("tuplesInWindow.size() = " + tuplesInWindow.size());
+            LOG.info("newTuples.size() = " + newTuples.size());
+            LOG.info("expiredTuples.size() = " + expiredTuples.size());
+            LOG.info(StringDecorator.decorate(componentId, "tuplesInWindow = " 
+ tuplesInWindow.toString()));
+            collector.emit(new Values("dummyValue"));
+        }
+
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields(DUMMY_FIELD));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/src/main/java/org/apache/storm/st/topology/window/data/FromJson.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/main/java/org/apache/storm/st/topology/window/data/FromJson.java
 
b/integration-test/src/main/java/org/apache/storm/st/topology/window/data/FromJson.java
new file mode 100644
index 0000000..d749abf
--- /dev/null
+++ 
b/integration-test/src/main/java/org/apache/storm/st/topology/window/data/FromJson.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.st.topology.window.data;
+
+public interface FromJson<T> {
+    T fromJson(String jsonStr);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/src/main/java/org/apache/storm/st/topology/window/data/TimeData.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/main/java/org/apache/storm/st/topology/window/data/TimeData.java
 
b/integration-test/src/main/java/org/apache/storm/st/topology/window/data/TimeData.java
new file mode 100644
index 0000000..cd2c7a5
--- /dev/null
+++ 
b/integration-test/src/main/java/org/apache/storm/st/topology/window/data/TimeData.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.st.topology.window.data;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.Collection;
+import java.util.Date;
+
+public class TimeData implements Comparable<TimeData>, FromJson<TimeData> {
+    public static final TimeData CLS = new TimeData(-1);
+    private static final String NUMBER_FIELD_NAME = "number";
+    private static final String STRING_FIELD_NAME = "dateAsStr";
+    private static final String TIMESTAMP_FIELD_NAME = "date";
+    static final Gson gson = new 
GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").create();
+    private final int num;
+    private final Date now;
+    private final long timestamp;
+
+    private TimeData(int num) {
+        this(num, new Date());
+    }
+
+    private TimeData(int num, Date date) {
+        this.num = num;
+        this.now = date;
+        this.timestamp = date.getTime();
+    }
+
+    public static TimeData newData(int num) {
+        return new TimeData(num);
+    }
+
+    public static TimeData fromTuple(Tuple tuple) {
+        return new TimeData(tuple.getIntegerByField(NUMBER_FIELD_NAME), new 
Date(tuple.getLongByField(TIMESTAMP_FIELD_NAME)));
+    }
+
+    public TimeData fromJson(String jsonStr) {
+        return gson.fromJson(jsonStr, TimeData.class);
+    }
+
+    public String toString() {
+        return gson.toJson(this);
+    }
+
+    public static String toString(Collection<TimeData> elements) {
+        return gson.toJson(elements);
+    }
+
+    public Values getValues() {
+        return new Values(num, now.toString(), timestamp);
+    }
+
+    public static String getTimestampFieldName() {
+        return TIMESTAMP_FIELD_NAME;
+    }
+
+    public Date getDate() {
+        return now;
+    }
+
+    public static Fields getFields() {
+        return new Fields(NUMBER_FIELD_NAME, STRING_FIELD_NAME, 
TIMESTAMP_FIELD_NAME);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        TimeData data = (TimeData) o;
+
+        if (num != data.num) return false;
+        if (timestamp != data.timestamp) return false;
+        return now.equals(data.now);
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = num;
+        result = 31 * result + now.hashCode();
+        result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+        return result;
+    }
+
+    @Override
+    public int compareTo(TimeData o) {
+        return Long.compare(timestamp, o.timestamp);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/src/main/java/org/apache/storm/st/topology/window/data/TimeDataWindow.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/main/java/org/apache/storm/st/topology/window/data/TimeDataWindow.java
 
b/integration-test/src/main/java/org/apache/storm/st/topology/window/data/TimeDataWindow.java
new file mode 100644
index 0000000..d6cb9d6
--- /dev/null
+++ 
b/integration-test/src/main/java/org/apache/storm/st/topology/window/data/TimeDataWindow.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.st.topology.window.data;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+import com.google.gson.reflect.TypeToken;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+public class TimeDataWindow extends ArrayList<TimeData> implements 
FromJson<TimeDataWindow> {
+    public static final TimeDataWindow CLS = new TimeDataWindow();
+    private static final Type listType = new TypeToken<List<TimeData>>() 
{}.getType();
+
+    private TimeDataWindow() {
+    }
+
+    private TimeDataWindow(List<TimeData> data) {
+        super(data);
+    }
+
+    public static TimeDataWindow newInstance(Collection<TimeData> data) {
+        final List<TimeData> dataCopy = new ArrayList<>(data);
+        Collections.sort(dataCopy);
+        return new TimeDataWindow(dataCopy);
+    }
+
+    public static TimeDataWindow newInstance(Collection<TimeData> data, 
Predicate<TimeData> predicate) {
+        return newInstance(Collections2.filter(data, predicate));
+    }
+
+    public static TimeDataWindow newInstance(Collection<TimeData> data, final 
DateTime fromDate, final DateTime toDate) {
+        return TimeDataWindow.newInstance(data, new Predicate<TimeData>() {
+            @Override
+            public boolean apply(@Nullable TimeData input) {
+                if (input == null) {
+                    return false;
+                }
+                final DateTime inputDate = new DateTime(input.getDate());
+                return inputDate.isAfter(fromDate) && 
inputDate.isBefore(toDate.plusMillis(1));
+            }
+        });
+    }
+
+    public TimeData first() {
+        return get(0);
+    }
+
+    public TimeData last() {
+        return get(size()-1);
+    }
+
+    public String getDescription() {
+        final int size = size();
+        if (size > 0) {
+            final TimeData first = first();
+            final TimeData last = last();
+            return "Total " + size + " items: " + first + " to " + last;
+        }
+        return "Total " + size + " items.";
+    }
+
+    public TimeDataWindow fromJson(String jsonStr) {
+        final List<TimeData> dataList = TimeData.gson.fromJson(jsonStr, 
listType);
+        Collections.sort(dataList);
+        return TimeDataWindow.newInstance(dataList);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/src/main/java/org/apache/storm/st/utils/StringDecorator.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/main/java/org/apache/storm/st/utils/StringDecorator.java 
b/integration-test/src/main/java/org/apache/storm/st/utils/StringDecorator.java
new file mode 100644
index 0000000..34c2b65
--- /dev/null
+++ 
b/integration-test/src/main/java/org/apache/storm/st/utils/StringDecorator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.st.utils;
+
+import org.apache.commons.lang.StringUtils;
+
+public class StringDecorator {
+
+    private static final String UNIQUE_PREFIX = 
"---bed91874d79720f7e324c43d49dba4ff---";
+
+    public static String decorate(String componentId, String decorate) {
+        return componentId + UNIQUE_PREFIX + decorate;
+    }
+
+    public static boolean isDecorated(String str) {
+        return str != null && str.contains(UNIQUE_PREFIX);
+    }
+
+    public static String[] split2(String decoratedString) {
+        return StringUtils.splitByWholeSeparator(decoratedString, 
UNIQUE_PREFIX, 2);
+    }
+}

Reply via email to