Fix flaky, slow integration tests in TestZkStreamProcessor and TestZkStreamProcessorSession
Fix flaky and slow integration tests in TestZkStreamProcessor and TestZkStreamProcessorSession Reason for failures: Thereâre three configurable wait times in rebalancing phase in samza standalone before consensus is acheived and processing resumes with updated jobModel. * debounceTime (Specified by `job.debounce.time.ms`. Upon processor change, leader waits for this interval before generating jobModel expecting stabilization in processors group(new arrival, deletion etc)). * taskShutdownMs (Specified by `task.shutdown.ms`. Wait time for SamzaContainer shutdown in StreamProcessor). * barrierWaitTimeOutMs (Specified by `job.coordinator.zk.consensus.timeout.ms`. Wait time for all processors in the group to join the barrier after creation). Above wait times affects rebalancing phase duration. All these wait time have defaults in order of 40-60 seconds and not set to low values. Flaky tests expects processors to come back up after rebalancing phase and drain message sources(Accomplished by checking a latch.count. RemoteApplicationRunner integration tests does exact same thing by checking if kafka input queue is drained directly with similar logic). In worst case rebalancing phases can last upto 3-4 minutes(Making these tests sometime take 10 minutes at worst case). Change: Set all the above timeouts to 2 seconds(Sufficient for tests and verified by local build). Benefits: * Faster build time(Average runtime of these individual tests were reduced from 1m56s to 14s) * More predicability in assertions(Didnât fail even once in 30-40 attempts locally). NOTE: If this doesnât fix TestZkStreamProcessor and TestZkStreamProcessorSession, longer term fix should be to use message markers in input source and shutdown taskCoordinator upon receiving them from TaskImpl(Or use bounded collection based pluggable InMemorySystemConsumer/InMemorySystemProducer). Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Reviewers: Bharath Kumarasubramanian <codin.mart...@gmail.com>, Navina Ramesh <nav...@apache.org> Closes #260 from shanthoosh/FIX_ZK_PROCESSOR_FLAKY_TESTS Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/69dbada6 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/69dbada6 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/69dbada6 Branch: refs/heads/0.14.0 Commit: 69dbada6ab8914d4b92fb480b96e2a62ccc8bc51 Parents: 0a4ecb2 Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Authored: Thu Aug 3 15:48:23 2017 -0700 Committer: navina <nav...@apache.org> Committed: Thu Aug 3 15:48:23 2017 -0700 ---------------------------------------------------------------------- .../samza/processor/TestZkStreamProcessorBase.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/69dbada6/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java index f2f1585..c848cde 100644 --- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java +++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java @@ -39,8 +39,10 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.config.MapConfig; +import org.apache.samza.config.TaskConfigJava; import org.apache.samza.config.ZkConfig; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobCoordinatorFactory; @@ -66,6 +68,12 @@ import org.slf4j.LoggerFactory; public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness { + private static final String TASK_SHUTDOWN_MS = "2000"; + private static final String JOB_DEBOUNCE_TIME_MS = "2000"; + private static final String BARRIER_TIMEOUT_MS = "2000"; + private static final String ZK_SESSION_TIMEOUT_MS = "2000"; + private static final String ZK_CONNECTION_TIMEOUT_MS = "2000"; + public final static Logger LOG = LoggerFactory.getLogger(TestZkStreamProcessorBase.class); public final static int BAD_MESSAGE_KEY = 1000; // to avoid long sleeps, we rather use multiple attempts with shorter sleeps @@ -181,6 +189,11 @@ public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness configs.put("task.name.grouper.factory", "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory"); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.zk.ZkJobCoordinatorFactory"); + configs.put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS); + configs.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS); + configs.put(ZkConfig.ZK_CONSENSUS_TIMEOUT_MS, BARRIER_TIMEOUT_MS); + configs.put(ZkConfig.ZK_SESSION_TIMEOUT_MS, ZK_SESSION_TIMEOUT_MS); + configs.put(ZkConfig.ZK_CONNECTION_TIMEOUT_MS, ZK_CONNECTION_TIMEOUT_MS); return configs; }