Repository: storm
Updated Branches:
  refs/heads/1.x-branch 7f22203c4 -> c46036930


STORM-2807: Shut down integration test topologies immediately after tests, 
retry killing each topology for a minute and throw an error if it can't be done


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c4603693
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c4603693
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c4603693

Branch: refs/heads/1.x-branch
Commit: c460369305f61bb1bc7aa5f20590f907ffd401dc
Parents: 7f22203
Author: Stig Rohde Døssing <[email protected]>
Authored: Wed Nov 8 21:02:55 2017 +0100
Committer: Stig Rohde Døssing <[email protected]>
Committed: Sat Nov 11 10:45:08 2017 +0100

----------------------------------------------------------------------
 integration-test/README.md                      |  2 +-
 .../window/TumblingTimeCorrectness.java         |  6 ++---
 .../window/TumblingWindowCorrectness.java       |  2 +-
 .../test/java/org/apache/storm/st/DemoTest.java |  2 +-
 .../st/tests/window/SlidingWindowTest.java      |  6 ++---
 .../st/tests/window/TumblingWindowTest.java     |  6 ++---
 .../apache/storm/st/wrapper/StormCluster.java   | 25 +++++++++++++-------
 .../org/apache/storm/st/wrapper/TopoWrap.java   |  6 ++---
 8 files changed, 32 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c4603693/integration-test/README.md
----------------------------------------------------------------------
diff --git a/integration-test/README.md b/integration-test/README.md
index 36c70a1..ab78387 100644
--- a/integration-test/README.md
+++ b/integration-test/README.md
@@ -16,7 +16,7 @@ And runs all the tests against it.
 
 Running tests for development & debugging
 =========================================
-```vagrant up``` command is steup as a complete auto-pilot.
+```vagrant up``` command is setup as a complete auto-pilot.
 Following describes how we can run individual tests against this vagrant 
cluster or any other cluster.
 
 Configs for running

http://git-wip-us.apache.org/repos/asf/storm/blob/c4603693/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java
 
b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java
index a77836f..7cadd18 100644
--- 
a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java
+++ 
b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java
@@ -55,9 +55,9 @@ public class TumblingTimeCorrectness implements 
TestableTopology {
     private final String spoutName;
     private final String boltName;
 
-    public TumblingTimeCorrectness(int timbleSec) {
-        this.tumbleSec = timbleSec;
-        final String prefix = this.getClass().getSimpleName() + "-timbleSec" + 
timbleSec;
+    public TumblingTimeCorrectness(int tumbleSec) {
+        this.tumbleSec = tumbleSec;
+        final String prefix = this.getClass().getSimpleName() + "-tumbleSec" + 
tumbleSec;
         spoutName = prefix + "IncrementingSpout";
         boltName = prefix + "VerificationBolt";
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/c4603693/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java
 
b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java
index 22c6d75..adcb9dd 100644
--- 
a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java
+++ 
b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java
@@ -54,7 +54,7 @@ public class TumblingWindowCorrectness implements 
TestableTopology {
 
     public TumblingWindowCorrectness(final int tumbleSize) {
         this.tumbleSize = tumbleSize;
-        final String prefix = this.getClass().getSimpleName() + "-tubleSize" + 
tumbleSize;
+        final String prefix = this.getClass().getSimpleName() + "-tumbleSize" 
+ tumbleSize;
         spoutName = prefix + "IncrementingSpout";
         boltName = prefix + "VerificationBolt";
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/c4603693/integration-test/src/test/java/org/apache/storm/st/DemoTest.java
----------------------------------------------------------------------
diff --git a/integration-test/src/test/java/org/apache/storm/st/DemoTest.java 
b/integration-test/src/test/java/org/apache/storm/st/DemoTest.java
index b011254..2c62d3f 100644
--- a/integration-test/src/test/java/org/apache/storm/st/DemoTest.java
+++ b/integration-test/src/test/java/org/apache/storm/st/DemoTest.java
@@ -79,7 +79,7 @@ public final class DemoTest extends AbstractTest {
     @AfterMethod
     public void cleanup() throws Exception {
         if (topo != null) {
-            topo.killQuietly();
+            topo.killOrThrow();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c4603693/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java
 
b/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java
index c4200a0..29afe7a 100644
--- 
a/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java
+++ 
b/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java
@@ -66,7 +66,7 @@ public final class SlidingWindowTest extends AbstractTest {
     @Test(dataProvider = "generateCountWindows")
     public void testWindowCount(int windowSize, int slideSize) throws 
Exception {
         final SlidingWindowCorrectness testable = new 
SlidingWindowCorrectness(windowSize, slideSize);
-        final String topologyName = this.getClass().getSimpleName() + "w" + 
windowSize + "s" + slideSize;
+        final String topologyName = this.getClass().getSimpleName() + 
"-window" + windowSize + "-slide" + slideSize;
         if (windowSize <= 0 || slideSize <= 0) {
             try {
                 testable.newTopology();
@@ -130,7 +130,7 @@ public final class SlidingWindowTest extends AbstractTest {
     @Test(dataProvider = "generateTimeWindows")
     public void testTimeWindow(int windowSec, int slideSec) throws Exception {
         final SlidingTimeCorrectness testable = new 
SlidingTimeCorrectness(windowSec, slideSec);
-        final String topologyName = this.getClass().getSimpleName() + "w" + 
windowSec + "s" + slideSec;
+        final String topologyName = this.getClass().getSimpleName() + 
"-window" + windowSec + "-slide" + slideSec;
         if (windowSec <= 0 || slideSec <= 0) {
             try {
                 testable.newTopology();
@@ -181,7 +181,7 @@ public final class SlidingWindowTest extends AbstractTest {
     @AfterMethod
     public void cleanup() throws Exception {
         if (topo != null) {
-            topo.killQuietly();
+            topo.killOrThrow();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c4603693/integration-test/src/test/java/org/apache/storm/st/tests/window/TumblingWindowTest.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/test/java/org/apache/storm/st/tests/window/TumblingWindowTest.java
 
b/integration-test/src/test/java/org/apache/storm/st/tests/window/TumblingWindowTest.java
index 450219d..866e4c8 100644
--- 
a/integration-test/src/test/java/org/apache/storm/st/tests/window/TumblingWindowTest.java
+++ 
b/integration-test/src/test/java/org/apache/storm/st/tests/window/TumblingWindowTest.java
@@ -48,7 +48,7 @@ public final class TumblingWindowTest extends AbstractTest {
     @Test(dataProvider = "generateWindows")
     public void testTumbleCount(int tumbleSize) throws Exception {
         final TumblingWindowCorrectness testable = new 
TumblingWindowCorrectness(tumbleSize);
-        final String topologyName = this.getClass().getSimpleName() + "t" + 
tumbleSize;
+        final String topologyName = this.getClass().getSimpleName() + "-size" 
+ tumbleSize;
         if (tumbleSize <= 0) {
             try {
                 testable.newTopology();
@@ -77,7 +77,7 @@ public final class TumblingWindowTest extends AbstractTest {
     @Test(dataProvider = "generateTumbleTimes")
     public void testTumbleTime(int tumbleSec) throws Exception {
         final TumblingTimeCorrectness testable = new 
TumblingTimeCorrectness(tumbleSec);
-        final String topologyName = this.getClass().getSimpleName() + "t" + 
tumbleSec;
+        final String topologyName = this.getClass().getSimpleName() + "-sec" + 
tumbleSec;
         if (tumbleSec <= 0) {
             try {
                 testable.newTopology();
@@ -93,7 +93,7 @@ public final class TumblingWindowTest extends AbstractTest {
     @AfterMethod
     public void cleanup() throws Exception {
         if (topo != null) {
-            topo.killQuietly();
+            topo.killOrThrow();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c4603693/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java 
b/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java
index 7311d5b..51226e1 100644
--- 
a/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java
+++ 
b/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java
@@ -38,6 +38,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 public class StormCluster {
     private static Logger log = LoggerFactory.getLogger(StormCluster.class);
@@ -83,13 +84,21 @@ public class StormCluster {
         return new ArrayList<>(filteredSummary);
     }
 
-    public void killSilently(String topologyName) {
-        try {
-            client.killTopologyWithOpts(topologyName, new KillOptions());
-            log.info("Topology killed: " + topologyName);
-        } catch (Throwable e){
-            log.warn("Couldn't kill topology: " + topologyName + " Exception: 
" + ExceptionUtils.getFullStackTrace(e));
+    public void killOrThrow(String topologyName) throws Exception {
+        long start = System.currentTimeMillis();
+        while (System.currentTimeMillis() < start + 
TimeUnit.SECONDS.toMillis(60)) {
+            try {
+                KillOptions killOptions = new KillOptions();
+                killOptions.set_wait_secs(0);
+                client.killTopologyWithOpts(topologyName, killOptions);
+                log.info("Topology killed: " + topologyName);
+                return;
+            } catch (Throwable e) {
+                log.warn("Couldn't kill topology: " + topologyName + ", going 
to retry soon. Exception: " + ExceptionUtils.getFullStackTrace(e));
+                Thread.sleep(TimeUnit.SECONDS.toMillis(2));
+            }
         }
+        throw new RuntimeException("Failed to kill topology " + topologyName + 
". Subsequent tests may fail because worker slots are occupied");
     }
 
     public TopologySummary getOneActive() throws TException {
@@ -107,10 +116,10 @@ public class StormCluster {
         return client;
     }
 
-    public void killActiveTopologies() throws TException {
+    public void killActiveTopologies() throws Exception {
         List<TopologySummary> activeTopologies = getActive();
         for (TopologySummary activeTopology : activeTopologies) {
-            killSilently(activeTopology.get_name());
+            killOrThrow(activeTopology.get_name());
         }
 
         AssertUtil.empty(getActive());

http://git-wip-us.apache.org/repos/asf/storm/blob/c4603693/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java
----------------------------------------------------------------------
diff --git 
a/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java 
b/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java
index 2be0402..5e58637 100644
--- a/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java
+++ b/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java
@@ -223,7 +223,7 @@ public class TopoWrap {
     public void assertProgress(int minEmits, String componentName, int 
maxWaitSec) throws TException {
         waitForProgress(minEmits, componentName, maxWaitSec);
         long emitCount = getAllTimeEmittedCount(componentName);
-        Assert.assertTrue(emitCount >= minEmits, "Count for component " + 
componentName + " is " + emitCount + " min is " + minEmits);
+        Assert.assertTrue(emitCount >= minEmits, "Emit count for component '" 
+ componentName + "' is " + emitCount + ", min is " + minEmits);
     }
 
     public static class ExecutorURL {
@@ -372,7 +372,7 @@ public class TopoWrap {
         return retVal;
     }
 
-    public void killQuietly() {
-        cluster.killSilently(name);
+    public void killOrThrow() throws Exception {
+        cluster.killOrThrow(name);
     }
 }

Reply via email to