Repository: storm Updated Branches: refs/heads/1.1.x-branch f62d0449e -> e672295c9
STORM-2807: Shut down integration test topologies immediately after tests, retry killing each topology for a minute and throw an error if it can't be done Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e672295c Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e672295c Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e672295c Branch: refs/heads/1.1.x-branch Commit: e672295c905fa772cb16c18ed3c10007c7678a9a Parents: f62d044 Author: Stig Rohde Døssing <[email protected]> Authored: Wed Nov 8 21:02:55 2017 +0100 Committer: Stig Rohde Døssing <[email protected]> Committed: Sat Nov 11 10:49:40 2017 +0100 ---------------------------------------------------------------------- integration-test/README.md | 2 +- .../window/TumblingTimeCorrectness.java | 6 ++--- .../window/TumblingWindowCorrectness.java | 2 +- .../test/java/org/apache/storm/st/DemoTest.java | 2 +- .../st/tests/window/SlidingWindowTest.java | 6 ++--- .../st/tests/window/TumblingWindowTest.java | 6 ++--- .../apache/storm/st/wrapper/StormCluster.java | 25 +++++++++++++------- .../org/apache/storm/st/wrapper/TopoWrap.java | 6 ++--- 8 files changed, 32 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/e672295c/integration-test/README.md ---------------------------------------------------------------------- diff --git a/integration-test/README.md b/integration-test/README.md index 36c70a1..ab78387 100644 --- a/integration-test/README.md +++ b/integration-test/README.md @@ -16,7 +16,7 @@ And runs all the tests against it. Running tests for development & debugging ========================================= -```vagrant up``` command is steup as a complete auto-pilot. +```vagrant up``` command is setup as a complete auto-pilot. Following describes how we can run individual tests against this vagrant cluster or any other cluster. Configs for running http://git-wip-us.apache.org/repos/asf/storm/blob/e672295c/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java ---------------------------------------------------------------------- diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java index a77836f..7cadd18 100644 --- a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java +++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java @@ -55,9 +55,9 @@ public class TumblingTimeCorrectness implements TestableTopology { private final String spoutName; private final String boltName; - public TumblingTimeCorrectness(int timbleSec) { - this.tumbleSec = timbleSec; - final String prefix = this.getClass().getSimpleName() + "-timbleSec" + timbleSec; + public TumblingTimeCorrectness(int tumbleSec) { + this.tumbleSec = tumbleSec; + final String prefix = this.getClass().getSimpleName() + "-tumbleSec" + tumbleSec; spoutName = prefix + "IncrementingSpout"; boltName = prefix + "VerificationBolt"; } http://git-wip-us.apache.org/repos/asf/storm/blob/e672295c/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java ---------------------------------------------------------------------- diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java index 22c6d75..adcb9dd 100644 --- a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java +++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java @@ -54,7 +54,7 @@ public class TumblingWindowCorrectness implements TestableTopology { public TumblingWindowCorrectness(final int tumbleSize) { this.tumbleSize = tumbleSize; - final String prefix = this.getClass().getSimpleName() + "-tubleSize" + tumbleSize; + final String prefix = this.getClass().getSimpleName() + "-tumbleSize" + tumbleSize; spoutName = prefix + "IncrementingSpout"; boltName = prefix + "VerificationBolt"; } http://git-wip-us.apache.org/repos/asf/storm/blob/e672295c/integration-test/src/test/java/org/apache/storm/st/DemoTest.java ---------------------------------------------------------------------- diff --git a/integration-test/src/test/java/org/apache/storm/st/DemoTest.java b/integration-test/src/test/java/org/apache/storm/st/DemoTest.java index b011254..2c62d3f 100644 --- a/integration-test/src/test/java/org/apache/storm/st/DemoTest.java +++ b/integration-test/src/test/java/org/apache/storm/st/DemoTest.java @@ -79,7 +79,7 @@ public final class DemoTest extends AbstractTest { @AfterMethod public void cleanup() throws Exception { if (topo != null) { - topo.killQuietly(); + topo.killOrThrow(); } } } http://git-wip-us.apache.org/repos/asf/storm/blob/e672295c/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java ---------------------------------------------------------------------- diff --git a/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java b/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java index c4200a0..29afe7a 100644 --- a/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java +++ b/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java @@ -66,7 +66,7 @@ public final class SlidingWindowTest extends AbstractTest { @Test(dataProvider = "generateCountWindows") public void testWindowCount(int windowSize, int slideSize) throws Exception { final SlidingWindowCorrectness testable = new SlidingWindowCorrectness(windowSize, slideSize); - final String topologyName = this.getClass().getSimpleName() + "w" + windowSize + "s" + slideSize; + final String topologyName = this.getClass().getSimpleName() + "-window" + windowSize + "-slide" + slideSize; if (windowSize <= 0 || slideSize <= 0) { try { testable.newTopology(); @@ -130,7 +130,7 @@ public final class SlidingWindowTest extends AbstractTest { @Test(dataProvider = "generateTimeWindows") public void testTimeWindow(int windowSec, int slideSec) throws Exception { final SlidingTimeCorrectness testable = new SlidingTimeCorrectness(windowSec, slideSec); - final String topologyName = this.getClass().getSimpleName() + "w" + windowSec + "s" + slideSec; + final String topologyName = this.getClass().getSimpleName() + "-window" + windowSec + "-slide" + slideSec; if (windowSec <= 0 || slideSec <= 0) { try { testable.newTopology(); @@ -181,7 +181,7 @@ public final class SlidingWindowTest extends AbstractTest { @AfterMethod public void cleanup() throws Exception { if (topo != null) { - topo.killQuietly(); + topo.killOrThrow(); } } } http://git-wip-us.apache.org/repos/asf/storm/blob/e672295c/integration-test/src/test/java/org/apache/storm/st/tests/window/TumblingWindowTest.java ---------------------------------------------------------------------- diff --git a/integration-test/src/test/java/org/apache/storm/st/tests/window/TumblingWindowTest.java b/integration-test/src/test/java/org/apache/storm/st/tests/window/TumblingWindowTest.java index 450219d..866e4c8 100644 --- a/integration-test/src/test/java/org/apache/storm/st/tests/window/TumblingWindowTest.java +++ b/integration-test/src/test/java/org/apache/storm/st/tests/window/TumblingWindowTest.java @@ -48,7 +48,7 @@ public final class TumblingWindowTest extends AbstractTest { @Test(dataProvider = "generateWindows") public void testTumbleCount(int tumbleSize) throws Exception { final TumblingWindowCorrectness testable = new TumblingWindowCorrectness(tumbleSize); - final String topologyName = this.getClass().getSimpleName() + "t" + tumbleSize; + final String topologyName = this.getClass().getSimpleName() + "-size" + tumbleSize; if (tumbleSize <= 0) { try { testable.newTopology(); @@ -77,7 +77,7 @@ public final class TumblingWindowTest extends AbstractTest { @Test(dataProvider = "generateTumbleTimes") public void testTumbleTime(int tumbleSec) throws Exception { final TumblingTimeCorrectness testable = new TumblingTimeCorrectness(tumbleSec); - final String topologyName = this.getClass().getSimpleName() + "t" + tumbleSec; + final String topologyName = this.getClass().getSimpleName() + "-sec" + tumbleSec; if (tumbleSec <= 0) { try { testable.newTopology(); @@ -93,7 +93,7 @@ public final class TumblingWindowTest extends AbstractTest { @AfterMethod public void cleanup() throws Exception { if (topo != null) { - topo.killQuietly(); + topo.killOrThrow(); } } } http://git-wip-us.apache.org/repos/asf/storm/blob/e672295c/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java ---------------------------------------------------------------------- diff --git a/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java b/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java index 7311d5b..51226e1 100644 --- a/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java +++ b/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java @@ -38,6 +38,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; public class StormCluster { private static Logger log = LoggerFactory.getLogger(StormCluster.class); @@ -83,13 +84,21 @@ public class StormCluster { return new ArrayList<>(filteredSummary); } - public void killSilently(String topologyName) { - try { - client.killTopologyWithOpts(topologyName, new KillOptions()); - log.info("Topology killed: " + topologyName); - } catch (Throwable e){ - log.warn("Couldn't kill topology: " + topologyName + " Exception: " + ExceptionUtils.getFullStackTrace(e)); + public void killOrThrow(String topologyName) throws Exception { + long start = System.currentTimeMillis(); + while (System.currentTimeMillis() < start + TimeUnit.SECONDS.toMillis(60)) { + try { + KillOptions killOptions = new KillOptions(); + killOptions.set_wait_secs(0); + client.killTopologyWithOpts(topologyName, killOptions); + log.info("Topology killed: " + topologyName); + return; + } catch (Throwable e) { + log.warn("Couldn't kill topology: " + topologyName + ", going to retry soon. Exception: " + ExceptionUtils.getFullStackTrace(e)); + Thread.sleep(TimeUnit.SECONDS.toMillis(2)); + } } + throw new RuntimeException("Failed to kill topology " + topologyName + ". Subsequent tests may fail because worker slots are occupied"); } public TopologySummary getOneActive() throws TException { @@ -107,10 +116,10 @@ public class StormCluster { return client; } - public void killActiveTopologies() throws TException { + public void killActiveTopologies() throws Exception { List<TopologySummary> activeTopologies = getActive(); for (TopologySummary activeTopology : activeTopologies) { - killSilently(activeTopology.get_name()); + killOrThrow(activeTopology.get_name()); } AssertUtil.empty(getActive()); http://git-wip-us.apache.org/repos/asf/storm/blob/e672295c/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java ---------------------------------------------------------------------- diff --git a/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java b/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java index 2be0402..5e58637 100644 --- a/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java +++ b/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java @@ -223,7 +223,7 @@ public class TopoWrap { public void assertProgress(int minEmits, String componentName, int maxWaitSec) throws TException { waitForProgress(minEmits, componentName, maxWaitSec); long emitCount = getAllTimeEmittedCount(componentName); - Assert.assertTrue(emitCount >= minEmits, "Count for component " + componentName + " is " + emitCount + " min is " + minEmits); + Assert.assertTrue(emitCount >= minEmits, "Emit count for component '" + componentName + "' is " + emitCount + ", min is " + minEmits); } public static class ExecutorURL { @@ -372,7 +372,7 @@ public class TopoWrap { return retVal; } - public void killQuietly() { - cluster.killSilently(name); + public void killOrThrow() throws Exception { + cluster.killOrThrow(name); } }
