Repository: samza Updated Branches: refs/heads/master d806e9dab -> 33010a731
SAMZA-1487: Disable Flaky Zk Integration tests. Author: Shanthoosh Venkataraman <[email protected]> Reviewers: Prateek Maheshwari <[email protected]> Closes #353 from shanthoosh/master Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/33010a73 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/33010a73 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/33010a73 Branch: refs/heads/master Commit: 33010a7314eca58ff6a92c2b1dbee1c331220637 Parents: d806e9d Author: Shanthoosh Venkataraman <[email protected]> Authored: Thu Nov 9 10:24:53 2017 -0800 Committer: Prateek Maheshwari <[email protected]> Committed: Thu Nov 9 10:24:53 2017 -0800 ---------------------------------------------------------------------- .../samza/processor/TestZkStreamProcessor.java | 11 +++++------ .../samza/processor/TestZkStreamProcessorBase.java | 3 +-- .../processor/TestZkStreamProcessorFailures.java | 8 +++----- .../processor/TestZkStreamProcessorSession.java | 5 ++--- .../test/processor/TestZkLocalApplicationRunner.java | 15 +++++++-------- 5 files changed, 18 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/33010a73/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java index 7253b29..d5e7221 100644 --- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java +++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java @@ -21,7 +21,6 @@ package org.apache.samza.processor; import java.util.concurrent.CountDownLatch; import org.junit.Assert; -import org.junit.Test; /** @@ -35,17 +34,17 @@ public class TestZkStreamProcessor extends TestZkStreamProcessorBase { return "test_ZK_"; } - @Test + //@Test public void testSingleStreamProcessor() { testStreamProcessor(new String[]{"1"}); } - @Test + //@Test public void testTwoStreamProcessors() { testStreamProcessor(new String[]{"2", "3"}); } - @Test + //@Test public void testFiveStreamProcessors() { testStreamProcessor(new String[]{"4", "5", "6", "7", "8"}); } @@ -98,7 +97,7 @@ public class TestZkStreamProcessor extends TestZkStreamProcessorBase { verifyNumMessages(outputTopic, messageCount, messageCount); } - @Test + //@Test /** * Similar to the previous tests, but add another processor in the middle */ public void testStreamProcessorWithAdd() { @@ -170,7 +169,7 @@ public class TestZkStreamProcessor extends TestZkStreamProcessorBase { verifyNumMessages(outputTopic, 2 * messageCount, totalEventsToGenerate); } - @Test + //@Test /** * same as other happy path messages, but with one processor removed in the middle */ public void testStreamProcessorWithRemove() { http://git-wip-us.apache.org/repos/asf/samza/blob/33010a73/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java index c848cde..5cde446 100644 --- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java +++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java @@ -62,7 +62,6 @@ import org.apache.samza.zk.TestZkUtils; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.server.ZooKeeperServer; import org.junit.Assert; -import org.junit.Before; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,7 +90,7 @@ public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness return ""; } - @Before +// @Before public void setUp() { super.setUp(); // for each tests - make the common parts unique http://git-wip-us.apache.org/repos/asf/samza/blob/33010a73/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorFailures.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorFailures.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorFailures.java index 374e77c..540c69b 100644 --- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorFailures.java +++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorFailures.java @@ -26,8 +26,6 @@ import org.apache.samza.config.JobConfig; import org.apache.samza.config.ZkConfig; import org.apache.samza.zk.TestZkUtils; import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; /** @@ -44,12 +42,12 @@ public class TestZkStreamProcessorFailures extends TestZkStreamProcessorBase { return "test_ZK_failure_"; } - @Before +// @Before public void setUp() { super.setUp(); } - @Test(expected = org.apache.samza.SamzaException.class) + //@Test(expected = org.apache.samza.SamzaException.class) public void testZkUnavailable() { map.put(ZkConfig.ZK_CONNECT, "localhost:2222"); // non-existing zk map.put(ZkConfig.ZK_CONNECTION_TIMEOUT_MS, "3000"); // shorter timeout @@ -58,7 +56,7 @@ public class TestZkStreamProcessorFailures extends TestZkStreamProcessorBase { Assert.fail("should've thrown an exception"); } - @Test + //@Test // Test with a single processor failing. // One processor fails (to simulate the failure we inject a special message (id > 1000) which causes the processor to // throw an exception. http://git-wip-us.apache.org/repos/asf/samza/blob/33010a73/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java index 880d766..40eeaf0 100644 --- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java +++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java @@ -24,7 +24,6 @@ import org.apache.samza.config.JobConfig; import org.apache.samza.config.ZkConfig; import org.apache.samza.zk.ZkJobCoordinator; import org.junit.Assert; -import org.junit.Test; /** @@ -38,7 +37,7 @@ public class TestZkStreamProcessorSession extends TestZkStreamProcessorBase { return "test_ZKS_"; } - @Test + //@Test public void testSingleStreamProcessor() { testStreamProcessorWithSessionRestart(new String[]{"1"}); } @@ -49,7 +48,7 @@ public class TestZkStreamProcessorSession extends TestZkStreamProcessorBase { testStreamProcessorWithSessionRestart(new String[]{"2", "3"}); } - @Test + //@Test public void testFiveStreamProcessors() { testStreamProcessorWithSessionRestart(new String[]{"4", "5", "6", "7", "8"}); } http://git-wip-us.apache.org/repos/asf/samza/blob/33010a73/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java index c550a3b..eb087bb 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java @@ -55,7 +55,6 @@ import org.apache.samza.zk.ZkJobCoordinatorFactory; import org.apache.samza.zk.ZkKeyBuilder; import org.apache.samza.zk.ZkUtils; import org.junit.Rule; -import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.rules.Timeout; import org.slf4j.Logger; @@ -114,7 +113,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne @Rule public final ExpectedException expectedException = ExpectedException.none(); - @Override +// @Override public void setUp() { super.setUp(); String uniqueTestId = UUID.randomUUID().toString(); @@ -150,7 +149,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne } } - @Override +// @Override public void tearDown() { if (zookeeper().zookeeper().isRunning()) { for (String kafkaTopic : ImmutableList.of(inputKafkaTopic, outputKafkaTopic)) { @@ -198,7 +197,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne return applicationConfig; } - @Test + //@Test public void shouldStopNewProcessorsJoiningGroupWhenNumContainersIsGreaterThanNumTasks() throws InterruptedException { /** * sspGrouper is set to AllSspToSingleTaskGrouperFactory for this test case(All ssp's from input kafka topic are mapped to a single task). @@ -269,7 +268,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne assertEquals(NUM_KAFKA_EVENTS, processedMessagesLatch.getCount()); } - @Test + //@Test public void shouldReElectLeaderWhenLeaderDies() throws InterruptedException { // Set up kafka topics. publishKafkaEvents(inputKafkaTopic, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); @@ -323,7 +322,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne assertEquals(2, jobModel.getContainers().size()); } - @Test + //@Test public void shouldFailWhenNewProcessorJoinsWithSameIdAsExistingProcessor() throws InterruptedException { // Set up kafka topics. publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); @@ -355,7 +354,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne applicationRunner3.run(streamApp3); } - @Test + //@Test public void testRollingUpgradeOfStreamApplicationsShouldGenerateSameJobModel() throws Exception { // Set up kafka topics. publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); @@ -427,7 +426,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne assertEquals(jobModel.getContainers(), newJobModel.getContainers()); } - @Test + //@Test public void shouldKillStreamAppWhenZooKeeperDiesBeforeLeaderReElection() throws InterruptedException { // Set up kafka topics. publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
