Repository: storm Updated Branches: refs/heads/master fe413ab5c -> 73640f0c7
http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/main/java/org/apache/storm/st/utils/TimeUtil.java ---------------------------------------------------------------------- diff --git a/integration-test/src/main/java/org/apache/storm/st/utils/TimeUtil.java b/integration-test/src/main/java/org/apache/storm/st/utils/TimeUtil.java new file mode 100644 index 0000000..36bdfdd --- /dev/null +++ b/integration-test/src/main/java/org/apache/storm/st/utils/TimeUtil.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.st.utils; + +import org.apache.commons.lang.exception.ExceptionUtils; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +public class TimeUtil { + private static Logger log = LoggerFactory.getLogger(TimeUtil.class); + + public static void sleepSec(int sec) { + try { + TimeUnit.SECONDS.sleep(sec); + } catch (InterruptedException e) { + log.warn("Caught exception: " + ExceptionUtils.getFullStackTrace(e)); + } + } + public static void sleepMilliSec(int milliSec) { + try { + TimeUnit.MILLISECONDS.sleep(milliSec); + } catch (InterruptedException e) { + log.warn("Caught exception: " + ExceptionUtils.getFullStackTrace(e)); + } + } + + public static DateTime floor(DateTime dateTime, int sec) { + long modValue = dateTime.getMillis() % (1000 * sec); + return dateTime.minus(modValue); + } + + public static DateTime ceil(DateTime dateTime, int sec) { + long modValue = dateTime.getMillis() % (1000 * sec); + return dateTime.minus(modValue).plusSeconds(sec); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/test/java/org/apache/storm/st/DemoTest.java ---------------------------------------------------------------------- diff --git a/integration-test/src/test/java/org/apache/storm/st/DemoTest.java b/integration-test/src/test/java/org/apache/storm/st/DemoTest.java new file mode 100644 index 0000000..b011254 --- /dev/null +++ b/integration-test/src/test/java/org/apache/storm/st/DemoTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.st; + +import com.google.common.base.Function; +import com.google.common.collect.Collections2; +import com.google.common.collect.Lists; +import org.apache.storm.st.helper.AbstractTest; +import org.apache.storm.st.wrapper.TopoWrap; +import org.apache.storm.ExclamationTopology; +import org.apache.storm.generated.TopologyInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.Test; +import org.apache.storm.st.utils.TimeUtil; + +import javax.annotation.Nullable; +import java.util.Collection; +import java.util.List; + +public final class DemoTest extends AbstractTest { + private static Logger log = LoggerFactory.getLogger(DemoTest.class); + private static Collection<String> words = Lists.newArrayList("nathan", "mike", "jackson", "golda", "bertels"); + private static Collection<String> exclaim2Oputput = Collections2.transform(words, new Function<String, String>() { + @Nullable + @Override + public String apply(@Nullable String input) { + return input + "!!!!!!"; + } + }); + protected final String topologyName = this.getClass().getSimpleName(); + private TopoWrap topo; + + @Test + public void testExclamationTopology() throws Exception { + topo = new TopoWrap(cluster, topologyName, ExclamationTopology.getStormTopology()); + topo.submitSuccessfully(); + final int minExclaim2Emits = 500; + final int minSpountEmits = 10000; + for(int i = 0; i < 10; ++i) { + TopologyInfo topologyInfo = topo.getInfo(); + log.info(topologyInfo.toString()); + long wordSpoutEmittedCount = topo.getAllTimeEmittedCount(ExclamationTopology.WORD); + long exclaim1EmittedCount = topo.getAllTimeEmittedCount(ExclamationTopology.EXCLAIM_1); + long exclaim2EmittedCount = topo.getAllTimeEmittedCount(ExclamationTopology.EXCLAIM_2); + log.info("wordSpoutEmittedCount for spout 'word' = " + wordSpoutEmittedCount); + log.info("exclaim1EmittedCount = " + exclaim1EmittedCount); + log.info("exclaim2EmittedCount = " + exclaim2EmittedCount); + if (exclaim2EmittedCount > minExclaim2Emits || wordSpoutEmittedCount > minSpountEmits) { + break; + } + TimeUtil.sleepSec(6); + } + List<TopoWrap.ExecutorURL> boltUrls = topo.getLogUrls(ExclamationTopology.WORD); + log.info(boltUrls.toString()); + final String actualOutput = topo.getLogs(ExclamationTopology.EXCLAIM_2); + for (String oneExpectedOutput : exclaim2Oputput) { + Assert.assertTrue(actualOutput.contains(oneExpectedOutput), "Couldn't find " + oneExpectedOutput + " in urls"); + } + } + + @AfterMethod + public void cleanup() throws Exception { + if (topo != null) { + topo.killQuietly(); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/test/java/org/apache/storm/st/helper/AbstractTest.java ---------------------------------------------------------------------- diff --git a/integration-test/src/test/java/org/apache/storm/st/helper/AbstractTest.java b/integration-test/src/test/java/org/apache/storm/st/helper/AbstractTest.java new file mode 100644 index 0000000..57c4930 --- /dev/null +++ b/integration-test/src/test/java/org/apache/storm/st/helper/AbstractTest.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.st.helper; + +import org.apache.storm.st.wrapper.StormCluster; + +public abstract class AbstractTest { + protected final StormCluster cluster = new StormCluster(); + static { + System.setProperty("user.timezone", "UTC"); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/test/java/org/apache/storm/st/meta/TestngListener.java ---------------------------------------------------------------------- diff --git a/integration-test/src/test/java/org/apache/storm/st/meta/TestngListener.java b/integration-test/src/test/java/org/apache/storm/st/meta/TestngListener.java new file mode 100644 index 0000000..ed2f9ce --- /dev/null +++ b/integration-test/src/test/java/org/apache/storm/st/meta/TestngListener.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.st.meta; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.log4j.Logger; +import org.apache.log4j.NDC; +import org.testng.IExecutionListener; +import org.testng.ITestContext; +import org.testng.ITestListener; +import org.testng.ITestResult; + +import java.util.Arrays; + +/** + * Testng listener class. This is useful for things that are applicable to all the tests as well + * taking actions that depend on test results. + */ +public class TestngListener implements ITestListener, IExecutionListener { + private static final Logger LOGGER = Logger.getLogger(TestngListener.class); + private final String hr = StringUtils.repeat("-", 100); + + private enum RunResult {SUCCESS, FAILED, SKIPPED, TestFailedButWithinSuccessPercentage } + + @Override + public void onTestStart(ITestResult result) { + LOGGER.info(hr); + LOGGER.info( + String.format("Testing going to start for: %s.%s(%s)", result.getTestClass().getName(), + result.getName(), Arrays.toString(result.getParameters()))); + NDC.push(result.getName()); + } + + private void endOfTestHook(ITestResult result, RunResult outcome) { + LOGGER.info( + String.format("Testing going to end for: %s.%s(%s) ----- Status: %s", result.getTestClass().getName(), + result.getName(), Arrays.toString(result.getParameters()), outcome)); + NDC.pop(); + LOGGER.info(hr); + } + + @Override + public void onTestSuccess(ITestResult result) { + endOfTestHook(result, RunResult.SUCCESS); + } + + @Override + public void onTestFailure(ITestResult result) { + endOfTestHook(result, RunResult.FAILED); + + LOGGER.info(ExceptionUtils.getStackTrace(result.getThrowable())); + LOGGER.info(hr); + } + + @Override + public void onTestSkipped(ITestResult result) { + endOfTestHook(result, RunResult.SKIPPED); + } + + @Override + public void onTestFailedButWithinSuccessPercentage(ITestResult result) { + endOfTestHook(result, RunResult.TestFailedButWithinSuccessPercentage); + } + + @Override + public void onStart(ITestContext context) { + } + + @Override + public void onFinish(ITestContext context) { + } + + @Override + public void onExecutionStart() { + } + + @Override + public void onExecutionFinish() { + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java ---------------------------------------------------------------------- diff --git a/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java b/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java new file mode 100644 index 0000000..c4200a0 --- /dev/null +++ b/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.st.tests.window; + +import org.apache.storm.st.helper.AbstractTest; +import org.apache.storm.st.wrapper.LogData; +import org.apache.storm.st.wrapper.TopoWrap; +import org.apache.storm.thrift.TException; +import org.apache.storm.st.topology.TestableTopology; +import org.apache.storm.st.topology.window.SlidingTimeCorrectness; +import org.apache.storm.st.topology.window.SlidingWindowCorrectness; +import org.apache.storm.st.topology.window.data.TimeData; +import org.apache.storm.st.topology.window.data.TimeDataWindow; +import org.apache.storm.st.utils.TimeUtil; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.net.MalformedURLException; +import java.util.List; + +public final class SlidingWindowTest extends AbstractTest { + private static Logger log = LoggerFactory.getLogger(SlidingWindowTest.class); + private TopoWrap topo; + + @DataProvider + public static Object[][] generateCountWindows() { + final Object[][] objects = new Object[][]{ + {-1, 10}, + {10, -1}, + {0, 10}, + {10, 0}, + {0, 0}, + {-1, -1}, + {5, 10}, + {1, 1}, + {10, 5}, + {100, 10}, + {100, 100}, + {200, 100}, + {500, 100}, + }; + return objects; + } + + @Test(dataProvider = "generateCountWindows") + public void testWindowCount(int windowSize, int slideSize) throws Exception { + final SlidingWindowCorrectness testable = new SlidingWindowCorrectness(windowSize, slideSize); + final String topologyName = this.getClass().getSimpleName() + "w" + windowSize + "s" + slideSize; + if (windowSize <= 0 || slideSize <= 0) { + try { + testable.newTopology(); + Assert.fail("Expected IllegalArgumentException was not thrown."); + } catch (IllegalArgumentException ignore) { + return; + } + } + topo = new TopoWrap(cluster, topologyName, testable.newTopology()); + runAndVerifyCount(windowSize, slideSize, testable, topo); + } + + static void runAndVerifyCount(int windowSize, int slideSize, TestableTopology testable, TopoWrap topo) throws TException, MalformedURLException { + topo.submitSuccessfully(); + final int minSpoutEmits = 1000 + windowSize; + final int minBoltEmits = 5; + String boltName = testable.getBoltName(); + String spoutName = testable.getSpoutName(); + topo.waitForProgress(minSpoutEmits, spoutName, 180); + topo.waitForProgress(minBoltEmits, boltName, 180); + List<TopoWrap.ExecutorURL> boltUrls = topo.getLogUrls(boltName); + log.info(boltUrls.toString()); + final List<LogData> allBoltData = topo.getLogData(boltName); + final List<LogData> allSpoutData = topo.getLogData(spoutName); + Assert.assertTrue(allBoltData.size() >= minBoltEmits, + "Expecting min " + minBoltEmits + " bolt emits, found: " + allBoltData.size() + " \n\t" + allBoltData); + final int numberOfWindows = allBoltData.size() - windowSize / slideSize; + for(int i = 0; i < numberOfWindows; ++i ) { + log.info("Comparing window: " + (i + 1) + " of " + numberOfWindows); + final int toIndex = (i + 1) * slideSize; + final int fromIndex = toIndex - windowSize; + final int positiveFromIndex = fromIndex > 0 ? fromIndex : 0; + final List<LogData> windowData = allSpoutData.subList(positiveFromIndex, toIndex); + final String actualString = allBoltData.get(i).toString(); + for (LogData oneLog : windowData) { + final String logStr = oneLog.getData(); + Assert.assertTrue(actualString.contains(logStr), + String.format("Missing: '%s' \nActual: '%s' \nCalculated window: '%s'", logStr, actualString, windowData)); + } + } + } + + @DataProvider + public static Object[][] generateTimeWindows() { + final Object[][] objects = new Object[][]{ + {-1, 10}, + {10, -1}, + {0, 10}, + {10, 0}, + {0, 0}, + {-1, -1}, + {1, 1}, + {5, 2}, + {2, 5}, + {20, 5}, + {20, 10}, + }; + return objects; + } + + @Test(dataProvider = "generateTimeWindows") + public void testTimeWindow(int windowSec, int slideSec) throws Exception { + final SlidingTimeCorrectness testable = new SlidingTimeCorrectness(windowSec, slideSec); + final String topologyName = this.getClass().getSimpleName() + "w" + windowSec + "s" + slideSec; + if (windowSec <= 0 || slideSec <= 0) { + try { + testable.newTopology(); + Assert.fail("Expected IllegalArgumentException was not thrown."); + } catch (IllegalArgumentException ignore) { + return; + } + } + topo = new TopoWrap(cluster, topologyName, testable.newTopology()); + runAndVerifyTime(windowSec, slideSec, testable, topo); + } + + static void runAndVerifyTime(int windowSec, int slideSec, TestableTopology testable, TopoWrap topo) throws TException, java.net.MalformedURLException { + topo.submitSuccessfully(); + final int minSpoutEmits = 1000 + windowSec; + final int minBoltEmits = 5; + String boltName = testable.getBoltName(); + String spoutName = testable.getSpoutName(); + topo.waitForProgress(minSpoutEmits, spoutName, 60 + 10 * (windowSec + slideSec)); + topo.waitForProgress(minBoltEmits, boltName, 60 + 10 * (windowSec + slideSec)); + final List<TimeData> allSpoutData = topo.getLogData(spoutName, TimeData.CLS); + final List<LogData> allBoltLog = topo.getLogData(boltName); + final List<TimeDataWindow> allBoltData = topo.getLogData(boltName, TimeDataWindow.CLS); + Assert.assertTrue(allBoltLog.size() >= minBoltEmits, + "Expecting min " + minBoltEmits + " bolt emits, found: " + allBoltLog.size() + " \n\t" + allBoltLog); + final DateTime firstEndTime = TimeUtil.ceil(new DateTime(allSpoutData.get(0).getDate()).withZone(DateTimeZone.UTC), slideSec); + final int numberOfWindows = allBoltLog.size() - windowSec / slideSec; + for(int i = 0; i < numberOfWindows; ++i ) { + final DateTime toDate = firstEndTime.plusSeconds(i * slideSec); + final DateTime fromDate = toDate.minusSeconds(windowSec); + log.info("Comparing window: " + fromDate + " to " + toDate + " iter " + (i+1) + "/" + numberOfWindows); + final TimeDataWindow computedWindow = TimeDataWindow.newInstance(allSpoutData,fromDate, toDate); + final LogData oneBoltLog = allBoltLog.get(i); + final TimeDataWindow actualWindow = allBoltData.get(i); + log.info("Actual window: " + actualWindow.getDescription()); + log.info("Computed window: " + computedWindow.getDescription()); + for (TimeData oneLog : computedWindow) { + Assert.assertTrue(actualWindow.contains(oneLog), + String.format("Missing: '%s' \n\tActual: '%s' \n\tComputed window: '%s'", oneLog, oneBoltLog, computedWindow)); + } + for (TimeData oneLog : actualWindow) { + Assert.assertTrue(computedWindow.contains(oneLog), + String.format("Extra: '%s' \n\tActual: '%s' \n\tComputed window: '%s'", oneLog, oneBoltLog, computedWindow)); + } + } + } + + @AfterMethod + public void cleanup() throws Exception { + if (topo != null) { + topo.killQuietly(); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/test/java/org/apache/storm/st/tests/window/TumblingWindowTest.java ---------------------------------------------------------------------- diff --git a/integration-test/src/test/java/org/apache/storm/st/tests/window/TumblingWindowTest.java b/integration-test/src/test/java/org/apache/storm/st/tests/window/TumblingWindowTest.java new file mode 100644 index 0000000..450219d --- /dev/null +++ b/integration-test/src/test/java/org/apache/storm/st/tests/window/TumblingWindowTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.st.tests.window; + +import org.apache.storm.st.helper.AbstractTest; +import org.apache.storm.st.wrapper.TopoWrap; +import org.apache.storm.st.topology.window.TumblingTimeCorrectness; +import org.apache.storm.st.topology.window.TumblingWindowCorrectness; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +public final class TumblingWindowTest extends AbstractTest { + private static Logger log = LoggerFactory.getLogger(TumblingWindowTest.class); + TopoWrap topo; + + @DataProvider + public static Object[][] generateWindows() { + final Object[][] objects = new Object[][]{ + {-1}, + {0}, + {1}, + {10}, + {250}, + {500}, + }; + return objects; + } + + @Test(dataProvider = "generateWindows") + public void testTumbleCount(int tumbleSize) throws Exception { + final TumblingWindowCorrectness testable = new TumblingWindowCorrectness(tumbleSize); + final String topologyName = this.getClass().getSimpleName() + "t" + tumbleSize; + if (tumbleSize <= 0) { + try { + testable.newTopology(); + Assert.fail("Expected IllegalArgumentException was not thrown."); + } catch (IllegalArgumentException ignore) { + return; + } + } + topo = new TopoWrap(cluster, topologyName, testable.newTopology()); + SlidingWindowTest.runAndVerifyCount(tumbleSize, tumbleSize, testable, topo); + } + + @DataProvider + public static Object[][] generateTumbleTimes() { + final Object[][] objects = new Object[][]{ + {-1}, + {0}, + {1}, + {2}, + {5}, + {10}, + }; + return objects; + } + + @Test(dataProvider = "generateTumbleTimes") + public void testTumbleTime(int tumbleSec) throws Exception { + final TumblingTimeCorrectness testable = new TumblingTimeCorrectness(tumbleSec); + final String topologyName = this.getClass().getSimpleName() + "t" + tumbleSec; + if (tumbleSec <= 0) { + try { + testable.newTopology(); + Assert.fail("Expected IllegalArgumentException was not thrown."); + } catch (IllegalArgumentException ignore) { + return; + } + } + topo = new TopoWrap(cluster, topologyName, testable.newTopology()); + SlidingWindowTest.runAndVerifyTime(tumbleSec, tumbleSec, testable, topo); + } + + @AfterMethod + public void cleanup() throws Exception { + if (topo != null) { + topo.killQuietly(); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/test/java/org/apache/storm/st/utils/AssertUtil.java ---------------------------------------------------------------------- diff --git a/integration-test/src/test/java/org/apache/storm/st/utils/AssertUtil.java b/integration-test/src/test/java/org/apache/storm/st/utils/AssertUtil.java new file mode 100644 index 0000000..2d91892 --- /dev/null +++ b/integration-test/src/test/java/org/apache/storm/st/utils/AssertUtil.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.st.utils; + +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; + +import java.io.File; +import java.util.Collection; +import java.util.List; + +public class AssertUtil { + private static Logger log = LoggerFactory.getLogger(AssertUtil.class); + + public static void empty(Collection<?> collection) { + Assert.assertTrue(collection == null || collection.size() == 0, "Expected collection to be non-null, found: " + collection); + } + + public static void nonEmpty(Collection<?> collection, String message) { + Assert.assertNotNull(collection, message + " Expected collection to be non-null, found: " + collection); + greater(collection.size(), 0, message + " Expected collection to be non-empty, found: " + collection); + } + + public static void greater(int actual, int expected, String message) { + Assert.assertTrue(actual > expected, message); + } + + public static void exists(File path) { + Assert.assertNotNull(path, "Supplied path was expected to be non null, found: " + path); + Assert.assertTrue(path.exists(), "Supplied path was expected to be non null, found: " + path); + } + + public static void assertOneElement(Collection<?> collection) { + assertNElements(collection, 1); + } + + public static void assertNElements(Collection<?> collection, int expectedCount) { + String message = "Unexpected number of elements in the collection: " + collection; + Assert.assertEquals(collection.size(), expectedCount, message); + } + + public static void assertTwoElements(Collection<?> collection) { + assertNElements(collection, 2); + } + + public static void assertMatchCount(String actualOutput, List<String> expectedOutput, int requiredMatchCount) { + for (String oneExpectedOutput : expectedOutput) { + final int matchCount = StringUtils.countMatches(actualOutput, oneExpectedOutput); + log.info("In output, found " + matchCount + " occurrences of: " + oneExpectedOutput); + Assert.assertTrue(matchCount > requiredMatchCount, + "Found " + matchCount + "occurrence of " + oneExpectedOutput + " in urls, expected" + requiredMatchCount); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/test/java/org/apache/storm/st/wrapper/LogData.java ---------------------------------------------------------------------- diff --git a/integration-test/src/test/java/org/apache/storm/st/wrapper/LogData.java b/integration-test/src/test/java/org/apache/storm/st/wrapper/LogData.java new file mode 100644 index 0000000..b042713 --- /dev/null +++ b/integration-test/src/test/java/org/apache/storm/st/wrapper/LogData.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.st.wrapper; + +import org.apache.storm.st.utils.AssertUtil; +import org.apache.commons.lang.StringUtils; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.apache.storm.st.utils.StringDecorator; + +import java.util.Arrays; +import java.util.List; + +public class LogData implements Comparable<LogData> { + private final DateTime logDate; + private final String data; + private static final int dateLen = "2016-05-04 23:38:10.702".length(); //format of date in worker logs + private static final DateTimeFormatter dateFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS"); + + public LogData(String logLine) { + DateTime tempDate; + final String[] pair = StringDecorator.split2(StringUtils.strip(logLine)); + final List<String> pairList = Arrays.asList(pair); + AssertUtil.assertTwoElements(pairList); + tempDate = dateFormat.parseDateTime(pairList.get(0).substring(0, dateLen)); + this.logDate = tempDate; + this.data = pairList.get(1); + } + + @Override + public String toString() { + return "LogData{" + + "logDate=" + dateFormat.print(logDate) + + ", data='" + getData() + '\'' + + '}'; + } + + @Override + public int compareTo(LogData that) { + return this.logDate.compareTo(that.logDate); + } + + public String getData() { + return data; + } + + public DateTime getLogDate() { + return logDate; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java ---------------------------------------------------------------------- diff --git a/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java b/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java new file mode 100644 index 0000000..7311d5b --- /dev/null +++ b/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.st.wrapper; + +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; +import org.apache.storm.generated.ClusterSummary; +import org.apache.storm.generated.KillOptions; +import org.apache.storm.generated.Nimbus; +import org.apache.storm.generated.TopologyInfo; +import org.apache.storm.generated.TopologySummary; +import org.apache.storm.st.utils.AssertUtil; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.storm.thrift.TException; +import org.apache.storm.utils.NimbusClient; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public class StormCluster { + private static Logger log = LoggerFactory.getLogger(StormCluster.class); + private final Nimbus.Client client; + + public StormCluster() { + Map conf = getConfig(); + this.client = NimbusClient.getConfiguredClient(conf).getClient(); + } + + public static Map getConfig() { + return Utils.readStormConfig(); + } + + public static boolean isSecure() { + final String thriftConfig = "" + getConfig().get("storm.thrift.transport"); + final String thriftConfigInSecCluster = "org.apache.storm.security.auth.kerberos.KerberosSaslTransportPlugin"; + return thriftConfigInSecCluster.equals(thriftConfig.trim()); + } + + public List<TopologySummary> getSummaries() throws TException { + final ClusterSummary clusterInfo = client.getClusterInfo(); + log.info("Cluster info: " + clusterInfo); + return clusterInfo.get_topologies(); + } + + public List<TopologySummary> getActive() throws TException { + return getTopologiesWithStatus("active"); + } + + public List<TopologySummary> getKilled() throws TException { + return getTopologiesWithStatus("killed"); + } + + private List<TopologySummary> getTopologiesWithStatus(final String expectedStatus) throws TException { + Collection<TopologySummary> topologySummaries = getSummaries(); + Collection<TopologySummary> filteredSummary = Collections2.filter(topologySummaries, new Predicate<TopologySummary>() { + @Override + public boolean apply(@Nullable TopologySummary input) { + return input != null && input.get_status().toLowerCase().equals(expectedStatus.toLowerCase()); + } + }); + return new ArrayList<>(filteredSummary); + } + + public void killSilently(String topologyName) { + try { + client.killTopologyWithOpts(topologyName, new KillOptions()); + log.info("Topology killed: " + topologyName); + } catch (Throwable e){ + log.warn("Couldn't kill topology: " + topologyName + " Exception: " + ExceptionUtils.getFullStackTrace(e)); + } + } + + public TopologySummary getOneActive() throws TException { + List<TopologySummary> topoSummaries = getActive(); + AssertUtil.nonEmpty(topoSummaries, "Expecting one active topology."); + Assert.assertEquals(topoSummaries.size(), 1, "Expected one topology to be running, found: " + topoSummaries); + return topoSummaries.get(0); + } + + public TopologyInfo getInfo(TopologySummary topologySummary) throws TException { + return client.getTopologyInfo(topologySummary.get_id()); + } + + public Nimbus.Client getNimbusClient() { + return client; + } + + public void killActiveTopologies() throws TException { + List<TopologySummary> activeTopologies = getActive(); + for (TopologySummary activeTopology : activeTopologies) { + killSilently(activeTopology.get_name()); + } + + AssertUtil.empty(getActive()); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java ---------------------------------------------------------------------- diff --git a/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java b/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java new file mode 100644 index 0000000..be1db6b --- /dev/null +++ b/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.st.wrapper; + +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.ComponentAggregateStats; +import org.apache.storm.generated.ComponentPageInfo; +import org.apache.storm.generated.ExecutorAggregateStats; +import org.apache.storm.generated.ExecutorStats; +import org.apache.storm.generated.ExecutorSummary; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.generated.TopologyInfo; +import org.apache.storm.generated.TopologySummary; +import org.apache.storm.st.utils.AssertUtil; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.storm.StormSubmitter; +import org.apache.storm.thrift.TException; +import org.apache.storm.st.topology.window.data.FromJson; +import org.apache.storm.st.utils.StringDecorator; +import org.apache.storm.st.utils.TimeUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.text.NumberFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +public class TopoWrap { + private static Logger log = LoggerFactory.getLogger(TopoWrap.class); + private final StormCluster cluster; + + private final String name; + private final StormTopology topology; + private String id; + public static Map<String, Object> submitConf = getSubmitConf(); + static { + String jarFile = getJarPath(); + log.info("setting storm.jar to: " + jarFile); + System.setProperty("storm.jar", jarFile); + } + + public TopoWrap(StormCluster cluster, String name, StormTopology topology) { + this.cluster = cluster; + this.name = name; + this.topology = topology; + } + + public void submit(ImmutableMap<String, Object> of) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { + final HashMap<String, Object> newConfig = new HashMap<>(submitConf); + newConfig.putAll(of); + StormSubmitter.submitTopologyWithProgressBar(name, newConfig, topology); + } + + private static Map<String, Object> getSubmitConf() { + Map<String, Object> submitConf = new HashMap<>(); + submitConf.put("storm.zookeeper.topology.auth.scheme", "digest"); + submitConf.put("topology.workers", 3); + submitConf.put("topology.debug", true); + return submitConf; + } + + private static String getJarPath() { + final String USER_DIR = "user.dir"; + String userDirVal = System.getProperty(USER_DIR); + Assert.assertNotNull(userDirVal, "property " + USER_DIR + " was not set."); + File projectDir = new File(userDirVal); + AssertUtil.exists(projectDir); + Collection<File> allJars = FileUtils.listFiles(projectDir, new String[]{"jar"}, true); + final Collection<File> jarFiles = Collections2.filter(allJars, new Predicate<File>() { + @Override + public boolean apply(@Nullable File input) { + return input != null && !input.getName().contains("surefirebooter"); + } + }); + log.info("Found jar files: " + jarFiles); + AssertUtil.nonEmpty(jarFiles, "The jar file is missing - did you run 'mvn clean package -DskipTests' before running tests ?"); + String jarFile = null; + for (File jarPath : jarFiles) { + log.info("jarPath = " + jarPath); + if (jarPath != null && !jarPath.getPath().contains("original")) { + AssertUtil.exists(jarPath); + jarFile = jarPath.getAbsolutePath(); + break; + } + } + Assert.assertNotNull(jarFile, "Couldn't detect a suitable jar file for uploading."); + log.info("jarFile = " + jarFile); + return jarFile; + } + + public void submitSuccessfully(ImmutableMap<String, Object> config) throws TException { + submit(config); + TopologySummary topologySummary = getSummary(); + Assert.assertEquals(topologySummary.get_status().toLowerCase(), "active", "Topology must be active."); + id = topologySummary.get_id(); + } + + public void submitSuccessfully() throws TException { + submitSuccessfully(ImmutableMap.<String, Object>of()); + } + + private TopologySummary getSummary() throws TException { + List<TopologySummary> allTopos = cluster.getSummaries(); + Collection<TopologySummary> oneTopo = Collections2.filter(allTopos, new Predicate<TopologySummary>() { + @Override + public boolean apply(@Nullable TopologySummary input) { + return input != null && input.get_name().equals(name); + } + }); + AssertUtil.assertOneElement(oneTopo); + return oneTopo.iterator().next(); + } + + public TopologyInfo getInfo() throws TException { + return cluster.getNimbusClient().getTopologyInfo(id); + } + + public long getAllTimeEmittedCount(final String componentId) throws TException { + TopologyInfo info = getInfo(); + final List<ExecutorSummary> executors = info.get_executors(); + List<Long> ackCounts = Lists.transform(executors, new Function<ExecutorSummary, Long>() { + @Nullable + @Override + public Long apply(@Nullable ExecutorSummary input) { + if (input == null || !input.get_component_id().equals(componentId)) + return 0L; + String since = ":all-time"; + return getEmittedCount(input, since); + } + + //possible values for since are strings :all-time, 600, 10800, 86400 + public Long getEmittedCount(@Nonnull ExecutorSummary input, @Nonnull String since) { + ExecutorStats executorStats = input.get_stats(); + if (executorStats == null) + return 0L; + Map<String, Map<String, Long>> emitted = executorStats.get_emitted(); + if (emitted == null) + return 0L; + Map<String, Long> allTime = emitted.get(since); + if (allTime == null) + return 0L; + return allTime.get("default"); + } + }); + return sum(ackCounts).longValue(); + } + + public List<ExecutorURL> getLogUrls(final String componentId) throws TException, MalformedURLException { + ComponentPageInfo componentPageInfo = cluster.getNimbusClient().getComponentPageInfo(id, componentId, null, false); + Map<String, ComponentAggregateStats> windowToStats = componentPageInfo.get_window_to_stats(); + ComponentAggregateStats allTimeStats = windowToStats.get(":all-time"); + //Long emitted = (Long) allTimeStats.getFieldValue(ComponentAggregateStats._Fields.findByName("emitted")); + + + List<ExecutorAggregateStats> execStats = componentPageInfo.get_exec_stats(); + Set<ExecutorURL> urls = new HashSet<>(); + for (ExecutorAggregateStats execStat : execStats) { + ExecutorSummary execSummary = execStat.get_exec_summary(); + String host = execSummary.get_host(); + int executorPort = execSummary.get_port(); + //http://supervisor2:8000/download/DemoTest-26-1462229009%2F6703%2Fworker.log + //http://supervisor2:8000/log?file=SlidingWindowCountTest-9-1462388349%2F6703%2Fworker.log + int logViewerPort = 8000; + ExecutorURL executorURL = new ExecutorURL(componentId, host, logViewerPort, executorPort, id); + urls.add(executorURL); + } + return new ArrayList<>(urls); + } + + public void waitForProgress(int minEmits, String componentName, int maxWaitSec) throws TException { + for(int i = 0; i < (maxWaitSec+9)/10; ++i) { + log.info(getInfo().toString()); + long emitCount = getAllTimeEmittedCount(componentName); + log.info("Count for component " + componentName + " is " + emitCount); + if (emitCount > minEmits) { + break; + } + TimeUtil.sleepSec(10); + } + } + + public void assertProgress(int minEmits, String componentName, int maxWaitSec) throws TException { + waitForProgress(minEmits, componentName, maxWaitSec); + long emitCount = getAllTimeEmittedCount(componentName); + Assert.assertTrue(emitCount >= minEmits, "Count for component " + componentName + " is " + emitCount + " min is " + minEmits); + } + + public static class ExecutorURL { + private String componentId; + private URL viewUrl; + private URL downloadUrl; + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof ExecutorURL)) return false; + + ExecutorURL that = (ExecutorURL) o; + + if (componentId != null ? !componentId.equals(that.componentId) : that.componentId != null) return false; + if (getViewUrl() != null ? !getViewUrl().equals(that.getViewUrl()) : that.getViewUrl() != null) + return false; + return getDownloadUrl() != null ? getDownloadUrl().equals(that.getDownloadUrl()) : that.getDownloadUrl() == null; + + } + + @Override + public int hashCode() { + int result = componentId != null ? componentId.hashCode() : 0; + result = 31 * result + (getViewUrl() != null ? getViewUrl().hashCode() : 0); + result = 31 * result + (getDownloadUrl() != null ? getDownloadUrl().hashCode() : 0); + return result; + } + + public ExecutorURL(String componentId, String host, int logViewerPort, int executorPort, String topoId) throws MalformedURLException { + String sep = "%2F"; //hex of "/" + String viewUrlStr = String.format("http://%s:%s/log?file=", host, logViewerPort); + String downloadUrlStr = String.format("http://%s:%s/download", host, logViewerPort); + viewUrl = new URL(String.format("%s/%s%s%d%sworker.log", viewUrlStr, topoId, sep, executorPort, sep)); + downloadUrl = new URL(String.format("%s/%s%s%d%sworker.log", downloadUrlStr, topoId, sep, executorPort, sep)); + this.componentId = componentId; + } + + public URL getDownloadUrl() { + return downloadUrl; + } + + public URL getViewUrl() { + return viewUrl; + } + + @Override + public String toString() { + return "ExecutorURL{" + + "componentId='" + componentId + '\'' + + ", viewUrl=" + viewUrl + + ", downloadUrl=" + downloadUrl + + '}'; + } + } + + public <T extends FromJson<T>> List<T> getLogData(final String componentId, final FromJson<T> cls) throws TException, MalformedURLException { + final List<LogData> logData = getLogData(componentId); + final List<T> data = new ArrayList<>( + Collections2.transform(logData, new Function<LogData, T>() { + @Nullable + @Override + public T apply(@Nullable LogData input) { + Assert.assertNotNull(input, "Expected LogData to be non-null."); + return cls.fromJson(input.getData()); + } + })); + return data; + } + + public List<LogData> getLogData(final String componentId) throws TException, MalformedURLException { + final String logs = getLogs(componentId); + final String dateRegex = "\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d{3}"; + Pattern pattern = Pattern.compile("(?=\\n" + dateRegex + ")"); + final String[] strings = pattern.split(logs); + final Collection<String> interestingLogs = Collections2.filter(Arrays.asList(strings), new Predicate<String>() { + @Override + public boolean apply(@Nullable String input) { + return input != null && StringDecorator.isDecorated(input); + } + }); + final Collection<LogData> logData = Collections2.transform(interestingLogs, new Function<String, LogData>() { + @Nullable + @Override + public LogData apply(@Nullable String input) { + return new LogData(input); + } + }); + final ArrayList<LogData> sortedLogs = new ArrayList<>(logData); + Collections.sort(sortedLogs); + log.info("Found " + sortedLogs.size() + " items for component: " + componentId); + return sortedLogs; + } + + public String getLogs(final String componentId) throws TException, MalformedURLException { + log.info("Fetching logs for componentId = " + componentId); + List<ExecutorURL> exclaim2Urls = getLogUrls(componentId); + log.info("Found " + exclaim2Urls.size() + " urls: " + exclaim2Urls.toString()); + Collection<String> urlOuputs = Collections2.transform(exclaim2Urls, new Function<ExecutorURL, String>() { + @Nullable + @Override + public String apply(@Nullable ExecutorURL executorURL) { + if (executorURL == null || executorURL.getDownloadUrl() == null) { + return ""; + } + String warnMessage = "Couldn't fetch executorURL: " + executorURL; + try { + log.info("Fetching: " + executorURL); + final URL downloadUrl = executorURL.downloadUrl; + final String urlContent = IOUtils.toString(downloadUrl); + if (urlContent.length() < 500) { + log.info("Fetched: " + urlContent); + } else { + log.info("Fetched: " + NumberFormat.getNumberInstance(Locale.US).format(urlContent.length()) + " bytes."); + } + if (System.getProperty("regression.downloadWorkerLogs").equalsIgnoreCase("true")) { + final String userDir = System.getProperty("user.dir"); + final File target = new File(userDir, "target"); + final File logDir = new File(target, "logs"); + final File logFile = new File(logDir, downloadUrl.getHost() + "-" + downloadUrl.getFile().split("/")[2]); + try { + FileUtils.forceMkdir(logDir); + FileUtils.write(logFile, urlContent); + } catch (Throwable throwable) { + log.info("Caught exteption: " + ExceptionUtils.getFullStackTrace(throwable)); + } + } + return urlContent; + } catch (IOException e) { + log.warn(warnMessage); + } + String stars = StringUtils.repeat("*", 30); + return stars + " " + warnMessage + " " + stars; + } + }); + return StringUtils.join(urlOuputs, '\n'); + } + + private Number sum(Collection<? extends Number> nums) { + Double retVal = 0.0; + for (Number num : nums) { + if(num != null) { + retVal += num.doubleValue(); + } + } + return retVal; + } + + public void killQuietly() { + cluster.killSilently(name); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/integration-test/src/test/resources/storm-conf/storm.yaml ---------------------------------------------------------------------- diff --git a/integration-test/src/test/resources/storm-conf/storm.yaml b/integration-test/src/test/resources/storm-conf/storm.yaml new file mode 100644 index 0000000..eca352f --- /dev/null +++ b/integration-test/src/test/resources/storm-conf/storm.yaml @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +storm.zookeeper.servers: + - "node1" + +nimbus.seeds: ["node1"] + +# netty transport +storm.messaging.transport: "org.apache.storm.messaging.netty.Context" +storm.messaging.netty.buffer_size: 16384 +storm.messaging.netty.max_retries: 10 +storm.messaging.netty.min_wait_ms: 1000 +storm.messaging.netty.max_wait_ms: 5000 + +drpc.servers: + - "node1" + +supervisor.slots.ports: [6700, 6701, 6702, 6703, 6704, 6705, 6706, 6707, 6708, 6709] http://git-wip-us.apache.org/repos/asf/storm/blob/b779ca4e/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 5c47f75..7bc566d 100644 --- a/pom.xml +++ b/pom.xml @@ -429,6 +429,9 @@ <!-- Avro Serializer Test Resource --> <exclude>**/src/test/resources/FixedAvroSerializer.config</exclude> + + <!-- Vagrant related files --> + <exclude>integration-test/config/.vagrant/**</exclude> </excludes> </configuration> </plugin>