Repository: storm
Updated Branches:
  refs/heads/master fe413ab5c -> 73640f0c7


http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/main/java/org/apache/storm/st/utils/TimeUtil.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/main/java/org/apache/storm/st/utils/TimeUtil.java 
b/integration-test/src/main/java/org/apache/storm/st/utils/TimeUtil.java
new file mode 100644
index 0000000..36bdfdd
--- /dev/null
+++ b/integration-test/src/main/java/org/apache/storm/st/utils/TimeUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.st.utils;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+public class TimeUtil {
+    private static Logger log = LoggerFactory.getLogger(TimeUtil.class);
+
+    public static void sleepSec(int sec) {
+        try {
+            TimeUnit.SECONDS.sleep(sec);
+        } catch (InterruptedException e) {
+            log.warn("Caught exception: " + 
ExceptionUtils.getFullStackTrace(e));
+        }
+    }
+    public static void sleepMilliSec(int milliSec) {
+        try {
+            TimeUnit.MILLISECONDS.sleep(milliSec);
+        } catch (InterruptedException e) {
+            log.warn("Caught exception: " + 
ExceptionUtils.getFullStackTrace(e));
+        }
+    }
+
+    public static DateTime floor(DateTime dateTime, int sec) {
+        long modValue = dateTime.getMillis() % (1000 * sec);
+        return dateTime.minus(modValue);
+    }
+
+    public static DateTime ceil(DateTime dateTime, int sec) {
+        long modValue = dateTime.getMillis() % (1000 * sec);
+        return dateTime.minus(modValue).plusSeconds(sec);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/test/java/org/apache/storm/st/DemoTest.java
----------------------------------------------------------------------
diff --git a/integration-test/src/test/java/org/apache/storm/st/DemoTest.java 
b/integration-test/src/test/java/org/apache/storm/st/DemoTest.java
new file mode 100644
index 0000000..b011254
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/storm/st/DemoTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.st;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import org.apache.storm.st.helper.AbstractTest;
+import org.apache.storm.st.wrapper.TopoWrap;
+import org.apache.storm.ExclamationTopology;
+import org.apache.storm.generated.TopologyInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+import org.apache.storm.st.utils.TimeUtil;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.List;
+
+public final class DemoTest extends AbstractTest {
+    private static Logger log = LoggerFactory.getLogger(DemoTest.class);
+    private static Collection<String> words = Lists.newArrayList("nathan", 
"mike", "jackson", "golda", "bertels");
+    private static Collection<String> exclaim2Oputput = 
Collections2.transform(words, new Function<String, String>() {
+        @Nullable
+        @Override
+        public String apply(@Nullable String input) {
+            return input +  "!!!!!!";
+        }
+    });
+    protected final String topologyName = this.getClass().getSimpleName();
+    private TopoWrap topo;
+
+    @Test
+    public void testExclamationTopology() throws Exception {
+        topo = new TopoWrap(cluster, topologyName, 
ExclamationTopology.getStormTopology());
+        topo.submitSuccessfully();
+        final int minExclaim2Emits = 500;
+        final int minSpountEmits = 10000;
+        for(int i = 0; i < 10; ++i) {
+            TopologyInfo topologyInfo = topo.getInfo();
+            log.info(topologyInfo.toString());
+            long wordSpoutEmittedCount = 
topo.getAllTimeEmittedCount(ExclamationTopology.WORD);
+            long exclaim1EmittedCount = 
topo.getAllTimeEmittedCount(ExclamationTopology.EXCLAIM_1);
+            long exclaim2EmittedCount = 
topo.getAllTimeEmittedCount(ExclamationTopology.EXCLAIM_2);
+            log.info("wordSpoutEmittedCount for spout 'word' = " + 
wordSpoutEmittedCount);
+            log.info("exclaim1EmittedCount = " + exclaim1EmittedCount);
+            log.info("exclaim2EmittedCount = " + exclaim2EmittedCount);
+            if (exclaim2EmittedCount > minExclaim2Emits || 
wordSpoutEmittedCount > minSpountEmits) {
+                break;
+            }
+            TimeUtil.sleepSec(6);
+        }
+        List<TopoWrap.ExecutorURL> boltUrls = 
topo.getLogUrls(ExclamationTopology.WORD);
+        log.info(boltUrls.toString());
+        final String actualOutput = 
topo.getLogs(ExclamationTopology.EXCLAIM_2);
+        for (String oneExpectedOutput : exclaim2Oputput) {
+            Assert.assertTrue(actualOutput.contains(oneExpectedOutput), 
"Couldn't find " + oneExpectedOutput + " in urls");
+        }
+    }
+
+    @AfterMethod
+    public void cleanup() throws Exception {
+        if (topo != null) {
+            topo.killQuietly();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/test/java/org/apache/storm/st/helper/AbstractTest.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/test/java/org/apache/storm/st/helper/AbstractTest.java 
b/integration-test/src/test/java/org/apache/storm/st/helper/AbstractTest.java
new file mode 100644
index 0000000..57c4930
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/storm/st/helper/AbstractTest.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.st.helper;
+
+import org.apache.storm.st.wrapper.StormCluster;
+
+public abstract class AbstractTest {
+    protected final StormCluster cluster = new StormCluster();
+    static {
+        System.setProperty("user.timezone", "UTC");
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/test/java/org/apache/storm/st/meta/TestngListener.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/test/java/org/apache/storm/st/meta/TestngListener.java 
b/integration-test/src/test/java/org/apache/storm/st/meta/TestngListener.java
new file mode 100644
index 0000000..ed2f9ce
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/storm/st/meta/TestngListener.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.st.meta;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.log4j.Logger;
+import org.apache.log4j.NDC;
+import org.testng.IExecutionListener;
+import org.testng.ITestContext;
+import org.testng.ITestListener;
+import org.testng.ITestResult;
+
+import java.util.Arrays;
+
+/**
+ * Testng listener class. This is useful for things that are applicable to all 
the tests as well
+ * taking actions that depend on test results.
+ */
+public class TestngListener implements ITestListener, IExecutionListener {
+    private static final Logger LOGGER = 
Logger.getLogger(TestngListener.class);
+    private final String hr = StringUtils.repeat("-", 100);
+
+    private enum RunResult {SUCCESS, FAILED, SKIPPED, 
TestFailedButWithinSuccessPercentage }
+
+    @Override
+    public void onTestStart(ITestResult result) {
+        LOGGER.info(hr);
+        LOGGER.info(
+                String.format("Testing going to start for: %s.%s(%s)", 
result.getTestClass().getName(),
+                        result.getName(), 
Arrays.toString(result.getParameters())));
+        NDC.push(result.getName());
+    }
+
+    private void endOfTestHook(ITestResult result, RunResult outcome) {
+        LOGGER.info(
+                String.format("Testing going to end for: %s.%s(%s) ----- 
Status: %s", result.getTestClass().getName(),
+                        result.getName(), 
Arrays.toString(result.getParameters()), outcome));
+        NDC.pop();
+        LOGGER.info(hr);
+    }
+
+    @Override
+    public void onTestSuccess(ITestResult result) {
+        endOfTestHook(result, RunResult.SUCCESS);
+    }
+
+    @Override
+    public void onTestFailure(ITestResult result) {
+        endOfTestHook(result, RunResult.FAILED);
+
+        LOGGER.info(ExceptionUtils.getStackTrace(result.getThrowable()));
+        LOGGER.info(hr);
+    }
+
+    @Override
+    public void onTestSkipped(ITestResult result) {
+        endOfTestHook(result, RunResult.SKIPPED);
+    }
+
+    @Override
+    public void onTestFailedButWithinSuccessPercentage(ITestResult result) {
+        endOfTestHook(result, RunResult.TestFailedButWithinSuccessPercentage);
+    }
+
+    @Override
+    public void onStart(ITestContext context) {
+    }
+
+    @Override
+    public void onFinish(ITestContext context) {
+    }
+
+    @Override
+    public void onExecutionStart() {
+    }
+
+    @Override
+    public void onExecutionFinish() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java
 
b/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java
new file mode 100644
index 0000000..c4200a0
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.st.tests.window;
+
+import org.apache.storm.st.helper.AbstractTest;
+import org.apache.storm.st.wrapper.LogData;
+import org.apache.storm.st.wrapper.TopoWrap;
+import org.apache.storm.thrift.TException;
+import org.apache.storm.st.topology.TestableTopology;
+import org.apache.storm.st.topology.window.SlidingTimeCorrectness;
+import org.apache.storm.st.topology.window.SlidingWindowCorrectness;
+import org.apache.storm.st.topology.window.data.TimeData;
+import org.apache.storm.st.topology.window.data.TimeDataWindow;
+import org.apache.storm.st.utils.TimeUtil;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.net.MalformedURLException;
+import java.util.List;
+
+public final class SlidingWindowTest extends AbstractTest {
+    private static Logger log = 
LoggerFactory.getLogger(SlidingWindowTest.class);
+    private TopoWrap topo;
+
+    @DataProvider
+    public static Object[][] generateCountWindows() {
+        final Object[][] objects = new Object[][]{
+                {-1, 10},
+                {10, -1},
+                {0, 10},
+                {10, 0},
+                {0, 0},
+                {-1, -1},
+                {5, 10},
+                {1, 1},
+                {10, 5},
+                {100, 10},
+                {100, 100},
+                {200, 100},
+                {500, 100},
+        };
+        return objects;
+    }
+
+    @Test(dataProvider = "generateCountWindows")
+    public void testWindowCount(int windowSize, int slideSize) throws 
Exception {
+        final SlidingWindowCorrectness testable = new 
SlidingWindowCorrectness(windowSize, slideSize);
+        final String topologyName = this.getClass().getSimpleName() + "w" + 
windowSize + "s" + slideSize;
+        if (windowSize <= 0 || slideSize <= 0) {
+            try {
+                testable.newTopology();
+                Assert.fail("Expected IllegalArgumentException was not 
thrown.");
+            } catch (IllegalArgumentException ignore) {
+                return;
+            }
+        }
+        topo = new TopoWrap(cluster, topologyName, testable.newTopology());
+        runAndVerifyCount(windowSize, slideSize, testable, topo);
+    }
+
+    static void runAndVerifyCount(int windowSize, int slideSize, 
TestableTopology testable, TopoWrap topo) throws TException, 
MalformedURLException {
+        topo.submitSuccessfully();
+        final int minSpoutEmits = 1000 + windowSize;
+        final int minBoltEmits = 5;
+        String boltName = testable.getBoltName();
+        String spoutName = testable.getSpoutName();
+        topo.waitForProgress(minSpoutEmits, spoutName, 180);
+        topo.waitForProgress(minBoltEmits, boltName, 180);
+        List<TopoWrap.ExecutorURL> boltUrls = topo.getLogUrls(boltName);
+        log.info(boltUrls.toString());
+        final List<LogData> allBoltData = topo.getLogData(boltName);
+        final List<LogData> allSpoutData = topo.getLogData(spoutName);
+        Assert.assertTrue(allBoltData.size() >= minBoltEmits,
+                "Expecting min " + minBoltEmits + " bolt emits, found: " + 
allBoltData.size() + " \n\t" + allBoltData);
+        final int numberOfWindows = allBoltData.size() - windowSize / 
slideSize;
+        for(int i = 0; i < numberOfWindows; ++i ) {
+            log.info("Comparing window: " + (i + 1) + " of " + 
numberOfWindows);
+            final int toIndex = (i + 1) * slideSize;
+            final int fromIndex = toIndex - windowSize;
+            final int positiveFromIndex = fromIndex > 0 ? fromIndex : 0;
+            final List<LogData> windowData = 
allSpoutData.subList(positiveFromIndex, toIndex);
+            final String actualString = allBoltData.get(i).toString();
+            for (LogData oneLog : windowData) {
+                final String logStr = oneLog.getData();
+                Assert.assertTrue(actualString.contains(logStr),
+                        String.format("Missing: '%s' \nActual: '%s' 
\nCalculated window: '%s'", logStr, actualString, windowData));
+            }
+        }
+    }
+
+    @DataProvider
+    public static Object[][] generateTimeWindows() {
+        final Object[][] objects = new Object[][]{
+                {-1, 10},
+                {10, -1},
+                {0, 10},
+                {10, 0},
+                {0, 0},
+                {-1, -1},
+                {1, 1},
+                {5, 2},
+                {2, 5},
+                {20, 5},
+                {20, 10},
+        };
+        return objects;
+    }
+
+    @Test(dataProvider = "generateTimeWindows")
+    public void testTimeWindow(int windowSec, int slideSec) throws Exception {
+        final SlidingTimeCorrectness testable = new 
SlidingTimeCorrectness(windowSec, slideSec);
+        final String topologyName = this.getClass().getSimpleName() + "w" + 
windowSec + "s" + slideSec;
+        if (windowSec <= 0 || slideSec <= 0) {
+            try {
+                testable.newTopology();
+                Assert.fail("Expected IllegalArgumentException was not 
thrown.");
+            } catch (IllegalArgumentException ignore) {
+                return;
+            }
+        }
+        topo = new TopoWrap(cluster, topologyName, testable.newTopology());
+        runAndVerifyTime(windowSec, slideSec, testable, topo);
+    }
+
+    static void runAndVerifyTime(int windowSec, int slideSec, TestableTopology 
testable, TopoWrap topo) throws TException, java.net.MalformedURLException {
+        topo.submitSuccessfully();
+        final int minSpoutEmits = 1000 + windowSec;
+        final int minBoltEmits = 5;
+        String boltName = testable.getBoltName();
+        String spoutName = testable.getSpoutName();
+        topo.waitForProgress(minSpoutEmits, spoutName, 60 + 10 * (windowSec + 
slideSec));
+        topo.waitForProgress(minBoltEmits, boltName, 60 + 10 * (windowSec + 
slideSec));
+        final List<TimeData> allSpoutData = topo.getLogData(spoutName, 
TimeData.CLS);
+        final List<LogData> allBoltLog = topo.getLogData(boltName);
+        final List<TimeDataWindow> allBoltData = topo.getLogData(boltName, 
TimeDataWindow.CLS);
+        Assert.assertTrue(allBoltLog.size() >= minBoltEmits,
+                "Expecting min " + minBoltEmits + " bolt emits, found: " + 
allBoltLog.size() + " \n\t" + allBoltLog);
+        final DateTime firstEndTime = TimeUtil.ceil(new 
DateTime(allSpoutData.get(0).getDate()).withZone(DateTimeZone.UTC), slideSec);
+        final int numberOfWindows = allBoltLog.size() - windowSec / slideSec;
+        for(int i = 0; i < numberOfWindows; ++i ) {
+            final DateTime toDate = firstEndTime.plusSeconds(i * slideSec);
+            final DateTime  fromDate =  toDate.minusSeconds(windowSec);
+            log.info("Comparing window: " + fromDate + " to " + toDate + " 
iter " + (i+1) + "/" + numberOfWindows);
+            final TimeDataWindow computedWindow = 
TimeDataWindow.newInstance(allSpoutData,fromDate, toDate);
+            final LogData oneBoltLog = allBoltLog.get(i);
+            final TimeDataWindow actualWindow = allBoltData.get(i);
+            log.info("Actual window: " + actualWindow.getDescription());
+            log.info("Computed window: " + computedWindow.getDescription());
+            for (TimeData oneLog : computedWindow) {
+                Assert.assertTrue(actualWindow.contains(oneLog),
+                        String.format("Missing: '%s' \n\tActual: '%s' 
\n\tComputed window: '%s'", oneLog, oneBoltLog, computedWindow));
+            }
+            for (TimeData oneLog : actualWindow) {
+                Assert.assertTrue(computedWindow.contains(oneLog),
+                        String.format("Extra: '%s' \n\tActual: '%s' 
\n\tComputed window: '%s'", oneLog, oneBoltLog, computedWindow));
+            }
+        }
+    }
+
+    @AfterMethod
+    public void cleanup() throws Exception {
+        if (topo != null) {
+            topo.killQuietly();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/test/java/org/apache/storm/st/tests/window/TumblingWindowTest.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/test/java/org/apache/storm/st/tests/window/TumblingWindowTest.java
 
b/integration-test/src/test/java/org/apache/storm/st/tests/window/TumblingWindowTest.java
new file mode 100644
index 0000000..450219d
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/storm/st/tests/window/TumblingWindowTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.st.tests.window;
+
+import org.apache.storm.st.helper.AbstractTest;
+import org.apache.storm.st.wrapper.TopoWrap;
+import org.apache.storm.st.topology.window.TumblingTimeCorrectness;
+import org.apache.storm.st.topology.window.TumblingWindowCorrectness;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public final class TumblingWindowTest extends AbstractTest {
+    private static Logger log = 
LoggerFactory.getLogger(TumblingWindowTest.class);
+    TopoWrap topo;
+
+    @DataProvider
+    public static Object[][] generateWindows() {
+        final Object[][] objects = new Object[][]{
+                {-1},
+                {0},
+                {1},
+                {10},
+                {250},
+                {500},
+        };
+        return objects;
+    }
+
+    @Test(dataProvider = "generateWindows")
+    public void testTumbleCount(int tumbleSize) throws Exception {
+        final TumblingWindowCorrectness testable = new 
TumblingWindowCorrectness(tumbleSize);
+        final String topologyName = this.getClass().getSimpleName() + "t" + 
tumbleSize;
+        if (tumbleSize <= 0) {
+            try {
+                testable.newTopology();
+                Assert.fail("Expected IllegalArgumentException was not 
thrown.");
+            } catch (IllegalArgumentException ignore) {
+                return;
+            }
+        }
+        topo = new TopoWrap(cluster, topologyName, testable.newTopology());
+        SlidingWindowTest.runAndVerifyCount(tumbleSize, tumbleSize, testable, 
topo);
+    }
+
+    @DataProvider
+    public static Object[][] generateTumbleTimes() {
+        final Object[][] objects = new Object[][]{
+                {-1},
+                {0},
+                {1},
+                {2},
+                {5},
+                {10},
+        };
+        return objects;
+    }
+
+    @Test(dataProvider = "generateTumbleTimes")
+    public void testTumbleTime(int tumbleSec) throws Exception {
+        final TumblingTimeCorrectness testable = new 
TumblingTimeCorrectness(tumbleSec);
+        final String topologyName = this.getClass().getSimpleName() + "t" + 
tumbleSec;
+        if (tumbleSec <= 0) {
+            try {
+                testable.newTopology();
+                Assert.fail("Expected IllegalArgumentException was not 
thrown.");
+            } catch (IllegalArgumentException ignore) {
+                return;
+            }
+        }
+        topo = new TopoWrap(cluster, topologyName, testable.newTopology());
+        SlidingWindowTest.runAndVerifyTime(tumbleSec, tumbleSec, testable, 
topo);
+    }
+
+    @AfterMethod
+    public void cleanup() throws Exception {
+        if (topo != null) {
+            topo.killQuietly();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/test/java/org/apache/storm/st/utils/AssertUtil.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/test/java/org/apache/storm/st/utils/AssertUtil.java 
b/integration-test/src/test/java/org/apache/storm/st/utils/AssertUtil.java
new file mode 100644
index 0000000..2d91892
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/storm/st/utils/AssertUtil.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.st.utils;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.List;
+
+public class AssertUtil {
+    private static Logger log = LoggerFactory.getLogger(AssertUtil.class);
+
+    public static void empty(Collection<?> collection) {
+        Assert.assertTrue(collection == null || collection.size() == 0, 
"Expected collection to be non-null, found: " + collection);
+    }
+
+    public static void nonEmpty(Collection<?> collection, String message) {
+        Assert.assertNotNull(collection, message + " Expected collection to be 
non-null, found: " + collection);
+        greater(collection.size(), 0, message + " Expected collection to be 
non-empty, found: " + collection);
+    }
+
+    public static void greater(int actual, int expected, String message) {
+        Assert.assertTrue(actual > expected, message);
+    }
+
+    public static void exists(File path) {
+        Assert.assertNotNull(path, "Supplied path was expected to be non null, 
found: " + path);
+        Assert.assertTrue(path.exists(), "Supplied path was expected to be non 
null, found: " + path);
+    }
+
+    public static void assertOneElement(Collection<?> collection) {
+        assertNElements(collection, 1);
+    }
+
+    public static void assertNElements(Collection<?> collection, int 
expectedCount) {
+        String message = "Unexpected number of elements in the collection: " + 
collection;
+        Assert.assertEquals(collection.size(), expectedCount, message);
+    }
+
+    public static void assertTwoElements(Collection<?> collection) {
+        assertNElements(collection, 2);
+    }
+
+    public static void assertMatchCount(String actualOutput, List<String> 
expectedOutput, int requiredMatchCount) {
+        for (String oneExpectedOutput : expectedOutput) {
+            final int matchCount = StringUtils.countMatches(actualOutput, 
oneExpectedOutput);
+            log.info("In output, found " + matchCount + " occurrences of: " + 
oneExpectedOutput);
+            Assert.assertTrue(matchCount > requiredMatchCount,
+                    "Found " + matchCount + "occurrence of " + 
oneExpectedOutput + " in urls, expected" + requiredMatchCount);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/test/java/org/apache/storm/st/wrapper/LogData.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/test/java/org/apache/storm/st/wrapper/LogData.java 
b/integration-test/src/test/java/org/apache/storm/st/wrapper/LogData.java
new file mode 100644
index 0000000..b042713
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/storm/st/wrapper/LogData.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.st.wrapper;
+
+import org.apache.storm.st.utils.AssertUtil;
+import org.apache.commons.lang.StringUtils;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.apache.storm.st.utils.StringDecorator;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class LogData implements Comparable<LogData> {
+    private final DateTime logDate;
+    private final String data;
+    private static final int dateLen = "2016-05-04 23:38:10.702".length(); 
//format of date in worker logs
+    private static final DateTimeFormatter dateFormat = 
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS");
+
+    public LogData(String logLine) {
+        DateTime tempDate;
+        final String[] pair = 
StringDecorator.split2(StringUtils.strip(logLine));
+        final List<String> pairList = Arrays.asList(pair);
+        AssertUtil.assertTwoElements(pairList);
+        tempDate = dateFormat.parseDateTime(pairList.get(0).substring(0, 
dateLen));
+        this.logDate = tempDate;
+        this.data = pairList.get(1);
+    }
+
+    @Override
+    public String toString() {
+        return "LogData{" +
+                "logDate=" + dateFormat.print(logDate) +
+                ", data='" + getData() + '\'' +
+                '}';
+    }
+
+    @Override
+    public int compareTo(LogData that) {
+        return this.logDate.compareTo(that.logDate);
+    }
+
+    public String getData() {
+        return data;
+    }
+
+    public DateTime getLogDate() {
+        return logDate;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java 
b/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java
new file mode 100644
index 0000000..7311d5b
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.st.wrapper;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+import org.apache.storm.generated.ClusterSummary;
+import org.apache.storm.generated.KillOptions;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.TopologyInfo;
+import org.apache.storm.generated.TopologySummary;
+import org.apache.storm.st.utils.AssertUtil;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.storm.thrift.TException;
+import org.apache.storm.utils.NimbusClient;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+public class StormCluster {
+    private static Logger log = LoggerFactory.getLogger(StormCluster.class);
+    private final Nimbus.Client client;
+
+    public StormCluster() {
+        Map conf = getConfig();
+        this.client = NimbusClient.getConfiguredClient(conf).getClient();
+    }
+
+    public static Map getConfig() {
+        return Utils.readStormConfig();
+    }
+
+    public static boolean isSecure() {
+        final String thriftConfig = "" + 
getConfig().get("storm.thrift.transport");
+        final String thriftConfigInSecCluster = 
"org.apache.storm.security.auth.kerberos.KerberosSaslTransportPlugin";
+        return thriftConfigInSecCluster.equals(thriftConfig.trim());
+    }
+
+    public List<TopologySummary> getSummaries() throws TException {
+        final ClusterSummary clusterInfo = client.getClusterInfo();
+        log.info("Cluster info: " + clusterInfo);
+        return clusterInfo.get_topologies();
+    }
+
+    public List<TopologySummary> getActive() throws TException {
+        return getTopologiesWithStatus("active");
+    }
+
+    public List<TopologySummary> getKilled() throws TException {
+        return getTopologiesWithStatus("killed");
+    }
+
+    private List<TopologySummary> getTopologiesWithStatus(final String 
expectedStatus) throws TException {
+        Collection<TopologySummary> topologySummaries = getSummaries();
+        Collection<TopologySummary> filteredSummary = 
Collections2.filter(topologySummaries, new Predicate<TopologySummary>() {
+            @Override
+            public boolean apply(@Nullable TopologySummary input) {
+                return input != null && 
input.get_status().toLowerCase().equals(expectedStatus.toLowerCase());
+            }
+        });
+        return new ArrayList<>(filteredSummary);
+    }
+
+    public void killSilently(String topologyName) {
+        try {
+            client.killTopologyWithOpts(topologyName, new KillOptions());
+            log.info("Topology killed: " + topologyName);
+        } catch (Throwable e){
+            log.warn("Couldn't kill topology: " + topologyName + " Exception: 
" + ExceptionUtils.getFullStackTrace(e));
+        }
+    }
+
+    public TopologySummary getOneActive() throws TException {
+        List<TopologySummary> topoSummaries = getActive();
+        AssertUtil.nonEmpty(topoSummaries, "Expecting one active topology.");
+        Assert.assertEquals(topoSummaries.size(), 1, "Expected one topology to 
be running, found: " + topoSummaries);
+        return topoSummaries.get(0);
+    }
+
+    public TopologyInfo getInfo(TopologySummary topologySummary) throws 
TException {
+        return client.getTopologyInfo(topologySummary.get_id());
+    }
+
+    public Nimbus.Client getNimbusClient() {
+        return client;
+    }
+
+    public void killActiveTopologies() throws TException {
+        List<TopologySummary> activeTopologies = getActive();
+        for (TopologySummary activeTopology : activeTopologies) {
+            killSilently(activeTopology.get_name());
+        }
+
+        AssertUtil.empty(getActive());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java 
b/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java
new file mode 100644
index 0000000..be1db6b
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.st.wrapper;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.ComponentAggregateStats;
+import org.apache.storm.generated.ComponentPageInfo;
+import org.apache.storm.generated.ExecutorAggregateStats;
+import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.generated.ExecutorSummary;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.TopologyInfo;
+import org.apache.storm.generated.TopologySummary;
+import org.apache.storm.st.utils.AssertUtil;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.thrift.TException;
+import org.apache.storm.st.topology.window.data.FromJson;
+import org.apache.storm.st.utils.StringDecorator;
+import org.apache.storm.st.utils.TimeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+public class TopoWrap {
+    private static Logger log = LoggerFactory.getLogger(TopoWrap.class);
+    private final StormCluster cluster;
+
+    private final String name;
+    private final StormTopology topology;
+    private String id;
+    public static Map<String, Object> submitConf = getSubmitConf();
+    static {
+        String jarFile = getJarPath();
+        log.info("setting storm.jar to: " + jarFile);
+        System.setProperty("storm.jar", jarFile);
+    }
+
+    public TopoWrap(StormCluster cluster, String name, StormTopology topology) 
{
+        this.cluster = cluster;
+        this.name = name;
+        this.topology = topology;
+    }
+
+    public void submit(ImmutableMap<String, Object> of) throws 
AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+        final HashMap<String, Object> newConfig = new HashMap<>(submitConf);
+        newConfig.putAll(of);
+        StormSubmitter.submitTopologyWithProgressBar(name, newConfig, 
topology);
+    }
+
+    private static Map<String, Object> getSubmitConf() {
+        Map<String, Object> submitConf = new HashMap<>();
+        submitConf.put("storm.zookeeper.topology.auth.scheme", "digest");
+        submitConf.put("topology.workers", 3);
+        submitConf.put("topology.debug", true);
+        return submitConf;
+    }
+
+    private static String getJarPath() {
+        final String USER_DIR = "user.dir";
+        String userDirVal = System.getProperty(USER_DIR);
+        Assert.assertNotNull(userDirVal, "property " + USER_DIR + " was not 
set.");
+        File projectDir = new File(userDirVal);
+        AssertUtil.exists(projectDir);
+        Collection<File> allJars = FileUtils.listFiles(projectDir, new 
String[]{"jar"}, true);
+        final Collection<File> jarFiles = Collections2.filter(allJars, new 
Predicate<File>() {
+            @Override
+            public boolean apply(@Nullable File input) {
+                return input != null && 
!input.getName().contains("surefirebooter");
+            }
+        });
+        log.info("Found jar files: " + jarFiles);
+        AssertUtil.nonEmpty(jarFiles, "The jar file is missing - did you run 
'mvn clean package -DskipTests' before running tests ?");
+        String jarFile = null;
+        for (File jarPath : jarFiles) {
+            log.info("jarPath = " + jarPath);
+            if (jarPath != null && !jarPath.getPath().contains("original")) {
+                AssertUtil.exists(jarPath);
+                jarFile = jarPath.getAbsolutePath();
+                break;
+            }
+        }
+        Assert.assertNotNull(jarFile, "Couldn't detect a suitable jar file for 
uploading.");
+        log.info("jarFile = " + jarFile);
+        return jarFile;
+    }
+
+    public void submitSuccessfully(ImmutableMap<String, Object> config) throws 
TException {
+        submit(config);
+        TopologySummary topologySummary = getSummary();
+        Assert.assertEquals(topologySummary.get_status().toLowerCase(), 
"active", "Topology must be active.");
+        id = topologySummary.get_id();
+    }
+
+    public void submitSuccessfully() throws TException {
+        submitSuccessfully(ImmutableMap.<String, Object>of());
+    }
+
+    private TopologySummary getSummary() throws TException {
+        List<TopologySummary> allTopos = cluster.getSummaries();
+        Collection<TopologySummary> oneTopo = Collections2.filter(allTopos, 
new Predicate<TopologySummary>() {
+            @Override
+            public boolean apply(@Nullable TopologySummary input) {
+                return input != null && input.get_name().equals(name);
+            }
+        });
+        AssertUtil.assertOneElement(oneTopo);
+        return oneTopo.iterator().next();
+    }
+
+    public TopologyInfo getInfo() throws TException {
+        return cluster.getNimbusClient().getTopologyInfo(id);
+    }
+
+    public long getAllTimeEmittedCount(final String componentId) throws 
TException {
+        TopologyInfo info = getInfo();
+        final List<ExecutorSummary> executors = info.get_executors();
+        List<Long> ackCounts = Lists.transform(executors, new 
Function<ExecutorSummary, Long>() {
+            @Nullable
+            @Override
+            public Long apply(@Nullable ExecutorSummary input) {
+                if (input == null || 
!input.get_component_id().equals(componentId))
+                    return 0L;
+                String since = ":all-time";
+                return getEmittedCount(input, since);
+            }
+
+            //possible values for since are strings :all-time, 600, 10800, 
86400
+            public Long getEmittedCount(@Nonnull ExecutorSummary input, 
@Nonnull String since) {
+                ExecutorStats executorStats = input.get_stats();
+                if (executorStats == null)
+                    return 0L;
+                Map<String, Map<String, Long>> emitted = 
executorStats.get_emitted();
+                if (emitted == null)
+                    return 0L;
+                Map<String, Long> allTime = emitted.get(since);
+                if (allTime == null)
+                    return 0L;
+                return allTime.get("default");
+            }
+        });
+        return sum(ackCounts).longValue();
+    }
+
+    public List<ExecutorURL> getLogUrls(final String componentId) throws 
TException, MalformedURLException {
+        ComponentPageInfo componentPageInfo = 
cluster.getNimbusClient().getComponentPageInfo(id, componentId, null, false);
+        Map<String, ComponentAggregateStats> windowToStats = 
componentPageInfo.get_window_to_stats();
+        ComponentAggregateStats allTimeStats = windowToStats.get(":all-time");
+        //Long emitted = (Long) 
allTimeStats.getFieldValue(ComponentAggregateStats._Fields.findByName("emitted"));
+
+
+        List<ExecutorAggregateStats> execStats = 
componentPageInfo.get_exec_stats();
+        Set<ExecutorURL> urls = new HashSet<>();
+        for (ExecutorAggregateStats execStat : execStats) {
+            ExecutorSummary execSummary = execStat.get_exec_summary();
+            String host = execSummary.get_host();
+            int executorPort = execSummary.get_port();
+            
//http://supervisor2:8000/download/DemoTest-26-1462229009%2F6703%2Fworker.log
+            
//http://supervisor2:8000/log?file=SlidingWindowCountTest-9-1462388349%2F6703%2Fworker.log
+            int logViewerPort = 8000;
+            ExecutorURL executorURL = new ExecutorURL(componentId, host, 
logViewerPort, executorPort, id);
+            urls.add(executorURL);
+        }
+        return new ArrayList<>(urls);
+    }
+
+    public void waitForProgress(int minEmits, String componentName, int 
maxWaitSec) throws TException {
+        for(int i = 0; i < (maxWaitSec+9)/10; ++i) {
+            log.info(getInfo().toString());
+            long emitCount = getAllTimeEmittedCount(componentName);
+            log.info("Count for component " + componentName + " is " + 
emitCount);
+            if (emitCount > minEmits) {
+                break;
+            }
+            TimeUtil.sleepSec(10);
+        }
+    }
+
+    public void assertProgress(int minEmits, String componentName, int 
maxWaitSec) throws TException {
+        waitForProgress(minEmits, componentName, maxWaitSec);
+        long emitCount = getAllTimeEmittedCount(componentName);
+        Assert.assertTrue(emitCount >= minEmits, "Count for component " + 
componentName + " is " + emitCount + " min is " + minEmits);
+    }
+
+    public static class ExecutorURL {
+        private String componentId;
+        private URL viewUrl;
+        private URL downloadUrl;
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof ExecutorURL)) return false;
+
+            ExecutorURL that = (ExecutorURL) o;
+
+            if (componentId != null ? !componentId.equals(that.componentId) : 
that.componentId != null) return false;
+            if (getViewUrl() != null ? !getViewUrl().equals(that.getViewUrl()) 
: that.getViewUrl() != null)
+                return false;
+            return getDownloadUrl() != null ? 
getDownloadUrl().equals(that.getDownloadUrl()) : that.getDownloadUrl() == null;
+
+        }
+
+        @Override
+        public int hashCode() {
+            int result = componentId != null ? componentId.hashCode() : 0;
+            result = 31 * result + (getViewUrl() != null ? 
getViewUrl().hashCode() : 0);
+            result = 31 * result + (getDownloadUrl() != null ? 
getDownloadUrl().hashCode() : 0);
+            return result;
+        }
+
+        public ExecutorURL(String componentId, String host, int logViewerPort, 
int executorPort, String topoId) throws MalformedURLException {
+            String sep = "%2F"; //hex of "/"
+            String viewUrlStr = String.format("http://%s:%s/log?file=";, host, 
logViewerPort);
+            String downloadUrlStr = String.format("http://%s:%s/download";, 
host, logViewerPort);
+            viewUrl = new URL(String.format("%s/%s%s%d%sworker.log", 
viewUrlStr, topoId, sep, executorPort, sep));
+            downloadUrl = new URL(String.format("%s/%s%s%d%sworker.log", 
downloadUrlStr, topoId, sep, executorPort, sep));
+            this.componentId = componentId;
+        }
+
+        public URL getDownloadUrl() {
+            return downloadUrl;
+        }
+
+        public URL getViewUrl() {
+            return viewUrl;
+        }
+
+        @Override
+        public String toString() {
+            return "ExecutorURL{" +
+                    "componentId='" + componentId + '\'' +
+                    ", viewUrl=" + viewUrl +
+                    ", downloadUrl=" + downloadUrl +
+                    '}';
+        }
+    }
+
+    public <T extends FromJson<T>> List<T> getLogData(final String 
componentId, final FromJson<T> cls) throws TException, MalformedURLException {
+        final List<LogData> logData = getLogData(componentId);
+        final List<T> data = new ArrayList<>(
+                Collections2.transform(logData, new Function<LogData, T>() {
+                    @Nullable
+                    @Override
+                    public T apply(@Nullable LogData input) {
+                        Assert.assertNotNull(input, "Expected LogData to be 
non-null.");
+                        return cls.fromJson(input.getData());
+                    }
+                }));
+        return data;
+    }
+
+    public List<LogData> getLogData(final String componentId) throws 
TException, MalformedURLException {
+        final String logs = getLogs(componentId);
+        final String dateRegex = "\\d{4}-\\d{2}-\\d{2} 
\\d{2}:\\d{2}:\\d{2}\\.\\d{3}";
+        Pattern pattern = Pattern.compile("(?=\\n" + dateRegex + ")");
+        final String[] strings = pattern.split(logs);
+        final Collection<String> interestingLogs = 
Collections2.filter(Arrays.asList(strings), new Predicate<String>() {
+            @Override
+            public boolean apply(@Nullable String input) {
+                return input != null && StringDecorator.isDecorated(input);
+            }
+        });
+        final Collection<LogData> logData = 
Collections2.transform(interestingLogs, new Function<String, LogData>() {
+            @Nullable
+            @Override
+            public LogData apply(@Nullable String input) {
+                return new LogData(input);
+            }
+        });
+        final ArrayList<LogData> sortedLogs = new ArrayList<>(logData);
+        Collections.sort(sortedLogs);
+        log.info("Found " + sortedLogs.size() + " items for component: " + 
componentId);
+        return sortedLogs;
+    }
+
+    public String getLogs(final String componentId) throws TException, 
MalformedURLException {
+        log.info("Fetching logs for componentId = " + componentId);
+        List<ExecutorURL> exclaim2Urls = getLogUrls(componentId);
+        log.info("Found " + exclaim2Urls.size() + " urls: " + 
exclaim2Urls.toString());
+        Collection<String> urlOuputs = Collections2.transform(exclaim2Urls, 
new Function<ExecutorURL, String>() {
+            @Nullable
+            @Override
+            public String apply(@Nullable ExecutorURL executorURL) {
+                if (executorURL == null || executorURL.getDownloadUrl() == 
null) {
+                    return "";
+                }
+                String warnMessage = "Couldn't fetch executorURL: " + 
executorURL;
+                try {
+                    log.info("Fetching: " + executorURL);
+                    final URL downloadUrl = executorURL.downloadUrl;
+                    final String urlContent = IOUtils.toString(downloadUrl);
+                    if (urlContent.length() < 500) {
+                        log.info("Fetched: " + urlContent);
+                    } else {
+                        log.info("Fetched: " + 
NumberFormat.getNumberInstance(Locale.US).format(urlContent.length()) + " 
bytes.");
+                    }
+                    if 
(System.getProperty("regression.downloadWorkerLogs").equalsIgnoreCase("true")) {
+                        final String userDir = System.getProperty("user.dir");
+                        final File target = new File(userDir, "target");
+                        final File logDir = new File(target, "logs");
+                        final File logFile = new File(logDir, 
downloadUrl.getHost() + "-" + downloadUrl.getFile().split("/")[2]);
+                        try {
+                            FileUtils.forceMkdir(logDir);
+                            FileUtils.write(logFile, urlContent);
+                        } catch (Throwable throwable) {
+                            log.info("Caught exteption: " + 
ExceptionUtils.getFullStackTrace(throwable));
+                        }
+                    }
+                    return urlContent;
+                } catch (IOException e) {
+                    log.warn(warnMessage);
+                }
+                String stars = StringUtils.repeat("*", 30);
+                return stars + "   " + warnMessage + "   " + stars;
+            }
+        });
+        return StringUtils.join(urlOuputs, '\n');
+    }
+
+    private Number sum(Collection<? extends Number> nums) {
+        Double retVal = 0.0;
+        for (Number num : nums) {
+            if(num != null) {
+                retVal += num.doubleValue();
+            }
+        }
+        return retVal;
+    }
+
+    public void killQuietly() {
+        cluster.killSilently(name);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/test/resources/storm-conf/storm.yaml
----------------------------------------------------------------------
diff --git a/integration-test/src/test/resources/storm-conf/storm.yaml 
b/integration-test/src/test/resources/storm-conf/storm.yaml
new file mode 100644
index 0000000..eca352f
--- /dev/null
+++ b/integration-test/src/test/resources/storm-conf/storm.yaml
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+storm.zookeeper.servers:
+    - "node1"
+
+nimbus.seeds: ["node1"]
+
+# netty transport
+storm.messaging.transport: "org.apache.storm.messaging.netty.Context"
+storm.messaging.netty.buffer_size: 16384
+storm.messaging.netty.max_retries: 10
+storm.messaging.netty.min_wait_ms: 1000
+storm.messaging.netty.max_wait_ms: 5000
+
+drpc.servers:
+  - "node1"
+
+supervisor.slots.ports: [6700, 6701, 6702, 6703, 6704, 6705, 6706, 6707, 6708, 
6709]

http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5c47f75..7bc566d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -429,6 +429,9 @@
 
                                 <!-- Avro Serializer Test Resource -->
                                 
<exclude>**/src/test/resources/FixedAvroSerializer.config</exclude>
+
+                                <!-- Vagrant related files -->
+                                
<exclude>integration-test/config/.vagrant/**</exclude>
                             </excludes>
                         </configuration>
                     </plugin>

Reply via email to