jolshan commented on code in PR #12847:
URL: https://github.com/apache/kafka/pull/12847#discussion_r1024435836
##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2690,110 +2689,150 @@ class KafkaApisTest {
assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error)
}
- @Test
- def testSyncGroupProtocolTypeAndName(): Unit = {
- for (version <- ApiKeys.SYNC_GROUP.oldestVersion to
ApiKeys.SYNC_GROUP.latestVersion) {
- testSyncGroupProtocolTypeAndName(version.asInstanceOf[Short])
- }
- }
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.SYNC_GROUP)
+ def testHandleSyncGroupRequest(version: Short): Unit = {
+ val syncGroupRequest = new SyncGroupRequestData()
+ .setGroupId("group")
+ .setMemberId("member")
+ .setProtocolType("consumer")
+ .setProtocolName("range")
- def testSyncGroupProtocolTypeAndName(version: Short): Unit = {
- reset(groupCoordinator, clientRequestQuotaManager, requestChannel,
replicaManager)
+ val requestChannelRequest = buildRequest(new
SyncGroupRequest.Builder(syncGroupRequest).build(version))
- val groupId = "group"
- val memberId = "member1"
- val protocolType = "consumer"
- val protocolName = "range"
+ val expectedRequestContext = new GroupCoordinatorRequestContext(
+ version,
+ requestChannelRequest.context.clientId,
+ requestChannelRequest.context.clientAddress,
+ RequestLocal.NoCaching.bufferSupplier
+ )
- val capturedCallback: ArgumentCaptor[SyncGroupCallback] =
ArgumentCaptor.forClass(classOf[SyncGroupCallback])
+ val expectedSyncGroupRequest = new SyncGroupRequestData()
+ .setGroupId("group")
+ .setMemberId("member")
+ .setProtocolType(if (version >= 5) "consumer" else null)
+ .setProtocolName(if (version >= 5) "range" else null)
- val requestLocal = RequestLocal.withThreadConfinedCaching
- val syncGroupRequest = new SyncGroupRequest.Builder(
- new SyncGroupRequestData()
- .setGroupId(groupId)
- .setGenerationId(0)
- .setMemberId(memberId)
- .setProtocolType(protocolType)
- .setProtocolName(protocolName)
- ).build(version)
+ val future = new CompletableFuture[SyncGroupResponseData]()
+ when(newGroupCoordinator.syncGroup(
+ ArgumentMatchers.eq(expectedRequestContext),
+ ArgumentMatchers.eq(expectedSyncGroupRequest)
+ )).thenReturn(future)
- val requestChannelRequest = buildRequest(syncGroupRequest)
+ createKafkaApis().handleSyncGroupRequest(
+ requestChannelRequest,
+ RequestLocal.NoCaching
+ )
- createKafkaApis().handleSyncGroupRequest(requestChannelRequest,
requestLocal)
+ val expectedSyncGroupResponse = new SyncGroupResponseData()
+ .setProtocolType("consumer")
+ .setProtocolName("range")
- verify(groupCoordinator).handleSyncGroup(
- ArgumentMatchers.eq(groupId),
- ArgumentMatchers.eq(0),
- ArgumentMatchers.eq(memberId),
- ArgumentMatchers.eq(if (version >= 5) Some(protocolType) else None),
- ArgumentMatchers.eq(if (version >= 5) Some(protocolName) else None),
- ArgumentMatchers.eq(None),
- ArgumentMatchers.eq(Map.empty),
- capturedCallback.capture(),
- ArgumentMatchers.eq(requestLocal)
+ future.complete(expectedSyncGroupResponse)
+ val capturedResponse = verifyNoThrottling(requestChannelRequest)
+ val response = capturedResponse.getValue.asInstanceOf[SyncGroupResponse]
+ assertEquals(expectedSyncGroupResponse, response.data)
+ }
+
+ @Test
+ def testHandleSyncGroupRequestFutureFailed(): Unit = {
Review Comment:
Confirming my understanding, we added
`testHandleSyncGroupRequestFutureFailed` and
`testHandleSyncGroupRequestAuthenticationFailed`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]