Fix flaky, slow integration tests in TestZkStreamProcessor and 
TestZkStreamProcessorSession

Fix flaky and slow integration tests in TestZkStreamProcessor and 
TestZkStreamProcessorSession
Reason for failures:

There’re three configurable wait times in rebalancing phase in samza 
standalone before consensus is acheived and processing resumes with updated 
jobModel.

* debounceTime (Specified by `job.debounce.time.ms`. Upon processor change, 
leader waits for this interval before generating jobModel expecting 
stabilization in processors group(new arrival, deletion etc)).
* taskShutdownMs (Specified by `task.shutdown.ms`. Wait time for SamzaContainer 
shutdown in StreamProcessor).
* barrierWaitTimeOutMs (Specified by `job.coordinator.zk.consensus.timeout.ms`. 
Wait time for all processors in the group to join the barrier after creation).

Above wait times affects rebalancing phase duration. All these wait time have 
defaults in order of 40-60 seconds and not set to low values.

Flaky tests expects processors to come back up after rebalancing phase and 
drain message sources(Accomplished by checking a latch.count. 
RemoteApplicationRunner integration tests does exact same thing by checking if 
kafka input queue is drained directly with similar logic).

In worst case rebalancing phases can last upto 3-4 minutes(Making these tests 
sometime take 10 minutes at worst case).

Change:

Set all the above timeouts to 2 seconds(Sufficient for tests and verified by 
local build).

Benefits:

* Faster build time(Average runtime of these individual tests were reduced from 
1m56s to 14s)
* More predicability in assertions(Didn’t fail even once in 30-40 attempts 
locally).

NOTE: If this doesn’t fix TestZkStreamProcessor and 
TestZkStreamProcessorSession,
longer term fix should be to use message markers in input source and
shutdown taskCoordinator upon receiving them from TaskImpl(Or use
bounded collection based pluggable 
InMemorySystemConsumer/InMemorySystemProducer).

Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com>

Reviewers: Bharath Kumarasubramanian <codin.mart...@gmail.com>, Navina Ramesh 
<nav...@apache.org>

Closes #260 from shanthoosh/FIX_ZK_PROCESSOR_FLAKY_TESTS


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/69dbada6
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/69dbada6
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/69dbada6

Branch: refs/heads/0.14.0
Commit: 69dbada6ab8914d4b92fb480b96e2a62ccc8bc51
Parents: 0a4ecb2
Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com>
Authored: Thu Aug 3 15:48:23 2017 -0700
Committer: navina <nav...@apache.org>
Committed: Thu Aug 3 15:48:23 2017 -0700

----------------------------------------------------------------------
 .../samza/processor/TestZkStreamProcessorBase.java     | 13 +++++++++++++
 1 file changed, 13 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/69dbada6/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
 
b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
index f2f1585..c848cde 100644
--- 
a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
+++ 
b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
@@ -39,8 +39,10 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
@@ -66,6 +68,12 @@ import org.slf4j.LoggerFactory;
 
 
 public class TestZkStreamProcessorBase extends 
StandaloneIntegrationTestHarness {
+  private static final String TASK_SHUTDOWN_MS = "2000";
+  private static final String JOB_DEBOUNCE_TIME_MS = "2000";
+  private static final String BARRIER_TIMEOUT_MS = "2000";
+  private static final String ZK_SESSION_TIMEOUT_MS = "2000";
+  private static final String ZK_CONNECTION_TIMEOUT_MS = "2000";
+
   public final static Logger LOG = 
LoggerFactory.getLogger(TestZkStreamProcessorBase.class);
   public final static int BAD_MESSAGE_KEY = 1000;
   // to avoid long sleeps, we rather use multiple attempts with shorter sleeps
@@ -181,6 +189,11 @@ public class TestZkStreamProcessorBase extends 
StandaloneIntegrationTestHarness
     configs.put("task.name.grouper.factory", 
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
 
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
"org.apache.samza.zk.ZkJobCoordinatorFactory");
+    configs.put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS);
+    configs.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS);
+    configs.put(ZkConfig.ZK_CONSENSUS_TIMEOUT_MS, BARRIER_TIMEOUT_MS);
+    configs.put(ZkConfig.ZK_SESSION_TIMEOUT_MS, ZK_SESSION_TIMEOUT_MS);
+    configs.put(ZkConfig.ZK_CONNECTION_TIMEOUT_MS, ZK_CONNECTION_TIMEOUT_MS);
 
     return configs;
   }

Reply via email to