This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3b8ed0a KAFKA-9784: Add OffsetFetch to group concurrency test (#8383)
3b8ed0a is described below
commit 3b8ed0a194bd1c90917c01ffc659fc05d2683c94
Author: Boyang Chen <[email protected]>
AuthorDate: Sun Apr 5 22:47:38 2020 -0700
KAFKA-9784: Add OffsetFetch to group concurrency test (#8383)
As title suggested, consumers would first do an OffsetFetch before starting
the normal processing. It makes sense to add it to the concurrent test suite to
verify whether there would be a blocking behavior.
Reviewers: Guozhang Wang <[email protected]>
---
.../group/GroupCoordinatorConcurrencyTest.scala | 35 ++++++++++++++++++----
1 file changed, 29 insertions(+), 6 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
index 3bba9e1..50f0f5e 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.JoinGroupRequest
+import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetFetchResponse}
import org.apache.kafka.common.utils.Time
import org.easymock.EasyMock
import org.junit.Assert._
@@ -53,13 +53,16 @@ class GroupCoordinatorConcurrencyTest extends
AbstractCoordinatorConcurrencyTest
private val allOperations = Seq(
new JoinGroupOperation,
new SyncGroupOperation,
+ new OffsetFetchOperation,
new CommitOffsetsOperation,
new HeartbeatOperation,
new LeaveGroupOperation
- )
+ )
+
private val allOperationsWithTxn = Seq(
new JoinGroupOperation,
new SyncGroupOperation,
+ new OffsetFetchOperation,
new CommitTxnOffsetsOperation,
new CompleteTxnOperation,
new HeartbeatOperation,
@@ -176,7 +179,7 @@ class GroupCoordinatorConcurrencyTest extends
AbstractCoordinatorConcurrencyTest
class SyncGroupOperation extends GroupOperation[SyncGroupCallbackParams,
SyncGroupCallback] {
override def responseCallback(responsePromise:
Promise[SyncGroupCallbackParams]): SyncGroupCallback = {
val callback: SyncGroupCallback = syncGroupResult =>
- responsePromise.success(syncGroupResult.memberAssignment,
syncGroupResult.error)
+ responsePromise.success(syncGroupResult.error,
syncGroupResult.memberAssignment)
callback
}
override def runWithCallback(member: GroupMember, responseCallback:
SyncGroupCallback): Unit = {
@@ -189,8 +192,10 @@ class GroupCoordinatorConcurrencyTest extends
AbstractCoordinatorConcurrencyTest
}
}
override def awaitAndVerify(member: GroupMember): Unit = {
- val result = await(member, DefaultSessionTimeout)
- assertEquals(Errors.NONE, result._2)
+ val result = await(member, DefaultSessionTimeout)
+ assertEquals(Errors.NONE, result._1)
+ assertNotNull(result._2)
+ assertEquals(0, result._2.length)
}
}
@@ -209,6 +214,22 @@ class GroupCoordinatorConcurrencyTest extends
AbstractCoordinatorConcurrencyTest
}
}
+ class OffsetFetchOperation extends GroupOperation[OffsetFetchCallbackParams,
OffsetFetchCallback] {
+ override def responseCallback(responsePromise:
Promise[OffsetFetchCallbackParams]): OffsetFetchCallback = {
+ val callback: OffsetFetchCallback = (error, offsets) =>
responsePromise.success(error, offsets)
+ callback
+ }
+ override def runWithCallback(member: GroupMember, responseCallback:
OffsetFetchCallback): Unit = {
+ val (error, partitionData) =
groupCoordinator.handleFetchOffsets(member.groupId, requireStable = true, None)
+ responseCallback(error, partitionData)
+ }
+ override def awaitAndVerify(member: GroupMember): Unit = {
+ val result = await(member, 500)
+ assertEquals(Errors.NONE, result._1)
+ assertEquals(Map.empty, result._2)
+ }
+ }
+
class CommitOffsetsOperation extends
GroupOperation[CommitOffsetCallbackParams, CommitOffsetCallback] {
override def responseCallback(responsePromise:
Promise[CommitOffsetCallbackParams]): CommitOffsetCallback = {
val callback: CommitOffsetCallback = offsets =>
responsePromise.success(offsets)
@@ -290,10 +311,12 @@ object GroupCoordinatorConcurrencyTest {
type JoinGroupCallbackParams = JoinGroupResult
type JoinGroupCallback = JoinGroupResult => Unit
- type SyncGroupCallbackParams = (Array[Byte], Errors)
+ type SyncGroupCallbackParams = (Errors, Array[Byte])
type SyncGroupCallback = SyncGroupResult => Unit
type HeartbeatCallbackParams = Errors
type HeartbeatCallback = Errors => Unit
+ type OffsetFetchCallbackParams = (Errors, Map[TopicPartition,
OffsetFetchResponse.PartitionData])
+ type OffsetFetchCallback = (Errors, Map[TopicPartition,
OffsetFetchResponse.PartitionData]) => Unit
type CommitOffsetCallbackParams = Map[TopicPartition, Errors]
type CommitOffsetCallback = Map[TopicPartition, Errors] => Unit
type LeaveGroupCallbackParams = LeaveGroupResult