Repository: storm
Updated Branches:
  refs/heads/master ee1be2b73 -> 2657c25f2


STORM-2807: Shut down integration test topologies immediately after tests, 
retry killing each topology for a minute and throw an error if it can't be done


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3cb52398
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3cb52398
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3cb52398

Branch: refs/heads/master
Commit: 3cb52398a3242ce6d1e01113aefee1402bd034a8
Parents: 9371685
Author: Stig Rohde Døssing <[email protected]>
Authored: Wed Nov 8 21:02:55 2017 +0100
Committer: Stig Rohde Døssing <[email protected]>
Committed: Fri Nov 10 09:08:29 2017 +0100

----------------------------------------------------------------------
 integration-test/README.md                      |  2 +-
 .../window/TumblingTimeCorrectness.java         |  6 ++---
 .../window/TumblingWindowCorrectness.java       |  2 +-
 .../test/java/org/apache/storm/st/DemoTest.java |  2 +-
 .../st/tests/window/SlidingWindowTest.java      |  6 ++---
 .../st/tests/window/TumblingWindowTest.java     |  6 ++---
 .../apache/storm/st/wrapper/StormCluster.java   | 25 +++++++++++++-------
 .../org/apache/storm/st/wrapper/TopoWrap.java   |  6 ++---
 8 files changed, 32 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3cb52398/integration-test/README.md
----------------------------------------------------------------------
diff --git a/integration-test/README.md b/integration-test/README.md
index 70ded80..8805f33 100644
--- a/integration-test/README.md
+++ b/integration-test/README.md
@@ -16,7 +16,7 @@ And runs all the tests against it.
 
 Running tests for development & debugging
 =========================================
-```vagrant up``` command is steup as a complete auto-pilot.
+```vagrant up``` command is setup as a complete auto-pilot.
 Following describes how we can run individual tests against this vagrant 
cluster or any other cluster.
 
 Configs for running

http://git-wip-us.apache.org/repos/asf/storm/blob/3cb52398/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java
 
b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java
index 64d7441..80709c8 100644
--- 
a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java
+++ 
b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java
@@ -55,9 +55,9 @@ public class TumblingTimeCorrectness implements 
TestableTopology {
     private final String spoutName;
     private final String boltName;
 
-    public TumblingTimeCorrectness(int timbleSec) {
-        this.tumbleSec = timbleSec;
-        final String prefix = this.getClass().getSimpleName() + "-timbleSec" + 
timbleSec;
+    public TumblingTimeCorrectness(int tumbleSec) {
+        this.tumbleSec = tumbleSec;
+        final String prefix = this.getClass().getSimpleName() + "-tumbleSec" + 
tumbleSec;
         spoutName = prefix + "IncrementingSpout";
         boltName = prefix + "VerificationBolt";
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/3cb52398/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java
 
b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java
index 05351df..0157a49 100644
--- 
a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java
+++ 
b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java
@@ -53,7 +53,7 @@ public class TumblingWindowCorrectness implements 
TestableTopology {
 
     public TumblingWindowCorrectness(final int tumbleSize) {
         this.tumbleSize = tumbleSize;
-        final String prefix = this.getClass().getSimpleName() + "-tubleSize" + 
tumbleSize;
+        final String prefix = this.getClass().getSimpleName() + "-tumbleSize" 
+ tumbleSize;
         spoutName = prefix + "IncrementingSpout";
         boltName = prefix + "VerificationBolt";
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/3cb52398/integration-test/src/test/java/org/apache/storm/st/DemoTest.java
----------------------------------------------------------------------
diff --git a/integration-test/src/test/java/org/apache/storm/st/DemoTest.java 
b/integration-test/src/test/java/org/apache/storm/st/DemoTest.java
index b011254..2c62d3f 100644
--- a/integration-test/src/test/java/org/apache/storm/st/DemoTest.java
+++ b/integration-test/src/test/java/org/apache/storm/st/DemoTest.java
@@ -79,7 +79,7 @@ public final class DemoTest extends AbstractTest {
     @AfterMethod
     public void cleanup() throws Exception {
         if (topo != null) {
-            topo.killQuietly();
+            topo.killOrThrow();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3cb52398/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java
 
b/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java
index 3de1a7d..13af8a1 100644
--- 
a/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java
+++ 
b/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java
@@ -67,7 +67,7 @@ public final class SlidingWindowTest extends AbstractTest {
     @Test(dataProvider = "generateCountWindows")
     public void testWindowCount(int windowSize, int slideSize) throws 
Exception {
         final SlidingWindowCorrectness testable = new 
SlidingWindowCorrectness(windowSize, slideSize);
-        final String topologyName = this.getClass().getSimpleName() + "w" + 
windowSize + "s" + slideSize;
+        final String topologyName = this.getClass().getSimpleName() + 
"-window" + windowSize + "-slide" + slideSize;
         if (windowSize <= 0 || slideSize <= 0) {
             try {
                 testable.newTopology();
@@ -135,7 +135,7 @@ public final class SlidingWindowTest extends AbstractTest {
     @Test(dataProvider = "generateTimeWindows")
     public void testTimeWindow(int windowSec, int slideSec) throws Exception {
         final SlidingTimeCorrectness testable = new 
SlidingTimeCorrectness(windowSec, slideSec);
-        final String topologyName = this.getClass().getSimpleName() + "w" + 
windowSec + "s" + slideSec;
+        final String topologyName = this.getClass().getSimpleName() + 
"-window" + windowSec + "-slide" + slideSec;
         if (windowSec <= 0 || slideSec <= 0) {
             try {
                 testable.newTopology();
@@ -187,7 +187,7 @@ public final class SlidingWindowTest extends AbstractTest {
     @AfterMethod
     public void cleanup() throws Exception {
         if (topo != null) {
-            topo.killQuietly();
+            topo.killOrThrow();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3cb52398/integration-test/src/test/java/org/apache/storm/st/tests/window/TumblingWindowTest.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/test/java/org/apache/storm/st/tests/window/TumblingWindowTest.java
 
b/integration-test/src/test/java/org/apache/storm/st/tests/window/TumblingWindowTest.java
index 450219d..866e4c8 100644
--- 
a/integration-test/src/test/java/org/apache/storm/st/tests/window/TumblingWindowTest.java
+++ 
b/integration-test/src/test/java/org/apache/storm/st/tests/window/TumblingWindowTest.java
@@ -48,7 +48,7 @@ public final class TumblingWindowTest extends AbstractTest {
     @Test(dataProvider = "generateWindows")
     public void testTumbleCount(int tumbleSize) throws Exception {
         final TumblingWindowCorrectness testable = new 
TumblingWindowCorrectness(tumbleSize);
-        final String topologyName = this.getClass().getSimpleName() + "t" + 
tumbleSize;
+        final String topologyName = this.getClass().getSimpleName() + "-size" 
+ tumbleSize;
         if (tumbleSize <= 0) {
             try {
                 testable.newTopology();
@@ -77,7 +77,7 @@ public final class TumblingWindowTest extends AbstractTest {
     @Test(dataProvider = "generateTumbleTimes")
     public void testTumbleTime(int tumbleSec) throws Exception {
         final TumblingTimeCorrectness testable = new 
TumblingTimeCorrectness(tumbleSec);
-        final String topologyName = this.getClass().getSimpleName() + "t" + 
tumbleSec;
+        final String topologyName = this.getClass().getSimpleName() + "-sec" + 
tumbleSec;
         if (tumbleSec <= 0) {
             try {
                 testable.newTopology();
@@ -93,7 +93,7 @@ public final class TumblingWindowTest extends AbstractTest {
     @AfterMethod
     public void cleanup() throws Exception {
         if (topo != null) {
-            topo.killQuietly();
+            topo.killOrThrow();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3cb52398/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java 
b/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java
index 5e1eb97..96492ca 100644
--- 
a/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java
+++ 
b/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java
@@ -38,6 +38,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 public class StormCluster {
     private static Logger log = LoggerFactory.getLogger(StormCluster.class);
@@ -83,13 +84,21 @@ public class StormCluster {
         return new ArrayList<>(filteredSummary);
     }
 
-    public void killSilently(String topologyName) {
-        try {
-            client.killTopologyWithOpts(topologyName, new KillOptions());
-            log.info("Topology killed: " + topologyName);
-        } catch (Throwable e){
-            log.warn("Couldn't kill topology: " + topologyName + " Exception: 
" + ExceptionUtils.getFullStackTrace(e));
+    public void killOrThrow(String topologyName) throws Exception {
+        long start = System.currentTimeMillis();
+        while (System.currentTimeMillis() < start + 
TimeUnit.SECONDS.toMillis(60)) {
+            try {
+                KillOptions killOptions = new KillOptions();
+                killOptions.set_wait_secs(0);
+                client.killTopologyWithOpts(topologyName, killOptions);
+                log.info("Topology killed: " + topologyName);
+                return;
+            } catch (Throwable e) {
+                log.warn("Couldn't kill topology: " + topologyName + ", going 
to retry soon. Exception: " + ExceptionUtils.getFullStackTrace(e));
+                Thread.sleep(TimeUnit.SECONDS.toMillis(2));
+            }
         }
+        throw new RuntimeException("Failed to kill topology " + topologyName + 
". Subsequent tests may fail because worker slots are occupied");
     }
 
     public TopologySummary getOneActive() throws TException {
@@ -107,10 +116,10 @@ public class StormCluster {
         return client;
     }
 
-    public void killActiveTopologies() throws TException {
+    public void killActiveTopologies() throws Exception {
         List<TopologySummary> activeTopologies = getActive();
         for (TopologySummary activeTopology : activeTopologies) {
-            killSilently(activeTopology.get_name());
+            killOrThrow(activeTopology.get_name());
         }
 
         AssertUtil.empty(getActive());

http://git-wip-us.apache.org/repos/asf/storm/blob/3cb52398/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java 
b/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java
index a0082a8..52b8646 100644
--- a/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java
+++ b/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java
@@ -229,7 +229,7 @@ public class TopoWrap {
     public void assertProgress(int minEmits, String componentName, int 
maxWaitSec) throws TException {
         waitForProgress(minEmits, componentName, maxWaitSec);
         long emitCount = getAllTimeEmittedCount(componentName);
-        Assert.assertTrue(emitCount >= minEmits, "Count for component " + 
componentName + " is " + emitCount + " min is " + minEmits);
+        Assert.assertTrue(emitCount >= minEmits, "Emit count for component '" 
+ componentName + "' is " + emitCount + ", min is " + minEmits);
     }
 
     public static class ExecutorURL {
@@ -373,7 +373,7 @@ public class TopoWrap {
         return retVal;
     }
 
-    public void killQuietly() {
-        cluster.killSilently(name);
+    public void killOrThrow() throws Exception {
+        cluster.killOrThrow(name);
     }
 }

Reply via email to