Repository: kafka
Updated Branches:
  refs/heads/trunk d32f3f282 -> 106a74560


MINOR: Add test cases for delays in consumer rebalance listener

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <ism...@juma.me.uk>

Closes #1866 from hachikuji/rebalance-delay-test-cases


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/106a7456
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/106a7456
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/106a7456

Branch: refs/heads/trunk
Commit: 106a7456060750ab0604d290b8c1e33a57adbf20
Parents: d32f3f2
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Sep 22 23:03:49 2016 +0100
Committer: Ismael Juma <ism...@juma.me.uk>
Committed: Thu Sep 22 23:03:49 2016 +0100

----------------------------------------------------------------------
 .../kafka/api/PlaintextConsumerTest.scala       | 82 ++++++++++++++++++--
 1 file changed, 75 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/106a7456/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index c83b54f..d18dc3a 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -14,22 +14,20 @@ package kafka.api
 
 
 import java.util
-import java.util.{Collections, Properties, Locale}
-
 import java.util.regex.Pattern
+import java.util.{Collections, Locale, Properties}
 
 import kafka.log.LogConfig
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer._
-import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.common.serialization.{ByteArraySerializer, 
StringDeserializer, StringSerializer}
-import org.apache.kafka.test.{MockConsumerInterceptor, MockProducerInterceptor}
-import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.InvalidTopicException
 import org.apache.kafka.common.record.{CompressionType, TimestampType}
-import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.common.serialization.{ByteArrayDeserializer, 
ByteArraySerializer, StringDeserializer, StringSerializer}
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.test.{MockConsumerInterceptor, MockProducerInterceptor}
 import org.junit.Assert._
 import org.junit.Test
 
@@ -82,6 +80,76 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   }
 
   @Test
+  def testMaxPollIntervalMsDelayInRevocation() {
+    
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
5000.toString)
+    
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
500.toString)
+    this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
1000.toString)
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
false.toString)
+
+    val consumer0 = new KafkaConsumer(this.consumerConfig, new 
ByteArrayDeserializer(), new ByteArrayDeserializer())
+    consumers += consumer0
+
+    var commitCompleted = false
+    var committedPosition: Long = -1
+
+    val listener = new TestConsumerReassignmentListener {
+      override def onPartitionsRevoked(partitions: 
util.Collection[TopicPartition]): Unit = {
+        if (callsToRevoked > 0) {
+          // on the second rebalance (after we have joined the group 
initially), sleep longer
+          // than session timeout and then try a commit. We should still be in 
the group,
+          // so the commit should succeed
+          Utils.sleep(1500)
+          committedPosition = consumer0.position(tp)
+          consumer0.commitSync(Map(tp -> new 
OffsetAndMetadata(committedPosition)).asJava)
+          commitCompleted = true
+        }
+        super.onPartitionsRevoked(partitions)
+      }
+    }
+
+    consumer0.subscribe(List(topic).asJava, listener)
+
+    // poll once to join the group and get the initial assignment
+    consumer0.poll(0)
+
+    // force a rebalance to trigger an invocation of the revocation callback 
while in the group
+    consumer0.subscribe(List("otherTopic").asJava, listener)
+    consumer0.poll(0)
+
+    assertEquals(0, committedPosition)
+    assertTrue(commitCompleted)
+  }
+
+  @Test
+  def testMaxPollIntervalMsDelayInAssignment() {
+    
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
5000.toString)
+    
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
500.toString)
+    this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
1000.toString)
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
false.toString)
+
+    val consumer0 = new KafkaConsumer(this.consumerConfig, new 
ByteArrayDeserializer(), new ByteArrayDeserializer())
+    consumers += consumer0
+
+    val listener = new TestConsumerReassignmentListener {
+      override def onPartitionsAssigned(partitions: 
util.Collection[TopicPartition]): Unit = {
+        // sleep longer than the session timeout, we should still be in the 
group after invocation
+        Utils.sleep(1500)
+        super.onPartitionsAssigned(partitions)
+      }
+    }
+    consumer0.subscribe(List(topic).asJava, listener)
+
+    // poll once to join the group and get the initial assignment
+    consumer0.poll(0)
+
+    // we should still be in the group after this invocation
+    consumer0.poll(0)
+
+    assertEquals(1, listener.callsToAssigned)
+    assertEquals(1, listener.callsToRevoked)
+  }
+
+  @Test
   def testAutoCommitOnClose() {
     this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true")
     val consumer0 = new KafkaConsumer(this.consumerConfig, new 
ByteArrayDeserializer(), new ByteArrayDeserializer())

Reply via email to