This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fd05073cc17 KAFKA-14367; Add `SyncGroup` to the new `GroupCoordinator` 
interface (#12847)
fd05073cc17 is described below

commit fd05073cc17a788d07d85a87c5c4053317a8ffa2
Author: David Jacot <[email protected]>
AuthorDate: Fri Dec 2 17:15:29 2022 +0100

    KAFKA-14367; Add `SyncGroup` to the new `GroupCoordinator` interface 
(#12847)
    
    This patch adds `syncGroup` to the new `GroupCoordinator` interface and 
updates `KafkaApis` to use it.
    
    Reviewers: Justine Olshan <[email protected]>, Jeff Kim 
<[email protected]>, Jason Gustafson <[email protected]>
---
 .../group/GroupCoordinatorAdapter.scala            |  39 ++++-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  72 +++-----
 .../scala/kafka/server/RequestHandlerHelper.scala  |  12 ++
 .../group/GroupCoordinatorAdapterTest.scala        |  72 +++++++-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 185 ++++++++++++---------
 .../kafka/coordinator/group/GroupCoordinator.java  |  17 ++
 6 files changed, 266 insertions(+), 131 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index f2dd34a5379..c86fed815e2 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -17,11 +17,12 @@
 package kafka.coordinator.group
 
 import kafka.server.RequestLocal
-import org.apache.kafka.common.message.{HeartbeatRequestData, 
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData}
+import org.apache.kafka.common.message.{HeartbeatRequestData, 
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, 
SyncGroupRequestData, SyncGroupResponseData}
 import org.apache.kafka.common.requests.RequestContext
 import org.apache.kafka.common.utils.BufferSupplier
 
 import java.util.concurrent.CompletableFuture
+import scala.collection.immutable
 import scala.jdk.CollectionConverters._
 
 /**
@@ -84,6 +85,42 @@ class GroupCoordinatorAdapter(
     future
   }
 
+  override def syncGroup(
+    context: RequestContext,
+    request: SyncGroupRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[SyncGroupResponseData] = {
+    val future = new CompletableFuture[SyncGroupResponseData]()
+
+    def callback(syncGroupResult: SyncGroupResult): Unit = {
+      future.complete(new SyncGroupResponseData()
+        .setErrorCode(syncGroupResult.error.code)
+        .setProtocolType(syncGroupResult.protocolType.orNull)
+        .setProtocolName(syncGroupResult.protocolName.orNull)
+        .setAssignment(syncGroupResult.memberAssignment)
+      )
+    }
+
+    val assignmentMap = immutable.Map.newBuilder[String, Array[Byte]]
+    request.assignments.forEach { assignment =>
+      assignmentMap += assignment.memberId -> assignment.assignment
+    }
+
+    coordinator.handleSyncGroup(
+      request.groupId,
+      request.generationId,
+      request.memberId,
+      Option(request.protocolType),
+      Option(request.protocolName),
+      Option(request.groupInstanceId),
+      assignmentMap.result(),
+      callback,
+      RequestLocal(bufferSupplier)
+    )
+
+    future
+  }
+
   override def heartbeat(
     context: RequestContext,
     request: HeartbeatRequestData
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index d8cc117d189..df6ae7682c4 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -196,7 +196,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request, 
requestLocal).exceptionally(handleError)
         case ApiKeys.HEARTBEAT => 
handleHeartbeatRequest(request).exceptionally(handleError)
         case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
-        case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request, 
requestLocal)
+        case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request, 
requestLocal).exceptionally(handleError)
         case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
         case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
         case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
@@ -1660,23 +1660,14 @@ class KafkaApis(val requestChannel: RequestChannel,
   ): CompletableFuture[Unit] = {
     val joinGroupRequest = request.body[JoinGroupRequest]
 
-    def sendResponse(response: AbstractResponse): Unit = {
-      trace("Sending join group response %s for correlation id %d to client 
%s."
-        .format(response, request.header.correlationId, 
request.header.clientId))
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
-        response.maybeSetThrottleTimeMs(requestThrottleMs)
-        response
-      })
-    }
-
     if (joinGroupRequest.data.groupInstanceId != null && 
config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
       // Only enable static membership when IBP >= 2.3, because it is not safe 
for the broker to use the static member logic
       // until we are sure that all brokers support it. If static group being 
loaded by an older coordinator, it will discard
       // the group.instance.id field, so static members could accidentally 
become "dynamic", which leads to wrong states.
-      
sendResponse(joinGroupRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      requestHelper.sendMaybeThrottle(request, 
joinGroupRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
       CompletableFuture.completedFuture[Unit](())
     } else if (!authHelper.authorize(request.context, READ, GROUP, 
joinGroupRequest.data.groupId)) {
-      
sendResponse(joinGroupRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+      requestHelper.sendMaybeThrottle(request, 
joinGroupRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
       CompletableFuture.completedFuture[Unit](())
     } else {
       newGroupCoordinator.joinGroup(
@@ -1685,56 +1676,45 @@ class KafkaApis(val requestChannel: RequestChannel,
         requestLocal.bufferSupplier
       ).handle[Unit] { (response, exception) =>
         if (exception != null) {
-          sendResponse(joinGroupRequest.getErrorResponse(exception))
+          requestHelper.sendMaybeThrottle(request, 
joinGroupRequest.getErrorResponse(exception))
         } else {
-          sendResponse(new JoinGroupResponse(response, 
request.context.apiVersion))
+          requestHelper.sendMaybeThrottle(request, new 
JoinGroupResponse(response, request.context.apiVersion))
         }
       }
     }
   }
 
-  def handleSyncGroupRequest(request: RequestChannel.Request, requestLocal: 
RequestLocal): Unit = {
+  def handleSyncGroupRequest(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {
     val syncGroupRequest = request.body[SyncGroupRequest]
 
-    def sendResponseCallback(syncGroupResult: SyncGroupResult): Unit = {
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new SyncGroupResponse(
-          new SyncGroupResponseData()
-            .setErrorCode(syncGroupResult.error.code)
-            .setProtocolType(syncGroupResult.protocolType.orNull)
-            .setProtocolName(syncGroupResult.protocolName.orNull)
-            .setAssignment(syncGroupResult.memberAssignment)
-            .setThrottleTimeMs(requestThrottleMs)
-        ))
-    }
-
     if (syncGroupRequest.data.groupInstanceId != null && 
config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
       // Only enable static membership when IBP >= 2.3, because it is not safe 
for the broker to use the static member logic
       // until we are sure that all brokers support it. If static group being 
loaded by an older coordinator, it will discard
       // the group.instance.id field, so static members could accidentally 
become "dynamic", which leads to wrong states.
-      sendResponseCallback(SyncGroupResult(Errors.UNSUPPORTED_VERSION))
+      requestHelper.sendMaybeThrottle(request, 
syncGroupRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())
     } else if (!syncGroupRequest.areMandatoryProtocolTypeAndNamePresent()) {
       // Starting from version 5, ProtocolType and ProtocolName fields are 
mandatory.
-      sendResponseCallback(SyncGroupResult(Errors.INCONSISTENT_GROUP_PROTOCOL))
+      requestHelper.sendMaybeThrottle(request, 
syncGroupRequest.getErrorResponse(Errors.INCONSISTENT_GROUP_PROTOCOL.exception))
+      CompletableFuture.completedFuture[Unit](())
     } else if (!authHelper.authorize(request.context, READ, GROUP, 
syncGroupRequest.data.groupId)) {
-      sendResponseCallback(SyncGroupResult(Errors.GROUP_AUTHORIZATION_FAILED))
+      requestHelper.sendMaybeThrottle(request, 
syncGroupRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+      CompletableFuture.completedFuture[Unit](())
     } else {
-      val assignmentMap = immutable.Map.newBuilder[String, Array[Byte]]
-      syncGroupRequest.data.assignments.forEach { assignment =>
-        assignmentMap += (assignment.memberId -> assignment.assignment)
-      }
-
-      groupCoordinator.handleSyncGroup(
-        syncGroupRequest.data.groupId,
-        syncGroupRequest.data.generationId,
-        syncGroupRequest.data.memberId,
-        Option(syncGroupRequest.data.protocolType),
-        Option(syncGroupRequest.data.protocolName),
-        Option(syncGroupRequest.data.groupInstanceId),
-        assignmentMap.result(),
-        sendResponseCallback,
-        requestLocal
-      )
+      newGroupCoordinator.syncGroup(
+        request.context,
+        syncGroupRequest.data,
+        requestLocal.bufferSupplier
+      ).handle[Unit] { (response, exception) =>
+        if (exception != null) {
+          requestHelper.sendMaybeThrottle(request, 
syncGroupRequest.getErrorResponse(exception))
+        } else {
+          requestHelper.sendMaybeThrottle(request, new 
SyncGroupResponse(response))
+        }
+      }
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/RequestHandlerHelper.scala 
b/core/src/main/scala/kafka/server/RequestHandlerHelper.scala
index 5db595986ef..75bd7a32969 100644
--- a/core/src/main/scala/kafka/server/RequestHandlerHelper.scala
+++ b/core/src/main/scala/kafka/server/RequestHandlerHelper.scala
@@ -107,6 +107,18 @@ class RequestHandlerHelper(
 
   // Throttle the channel if the request quota is enabled but has been 
violated. Regardless of throttling, send the
   // response immediately.
+  def sendMaybeThrottle(
+    request: RequestChannel.Request,
+    response: AbstractResponse
+  ): Unit = {
+    val throttleTimeMs = maybeRecordAndGetThrottleTimeMs(request)
+    // Only throttle non-forwarded requests
+    if (!request.isForwarded)
+      throttle(quotas.request, request, throttleTimeMs)
+    response.maybeSetThrottleTimeMs(throttleTimeMs)
+    requestChannel.sendResponse(request, response, None)
+  }
+
   def sendResponseMaybeThrottle(request: RequestChannel.Request,
                                 createResponse: Int => AbstractResponse): Unit 
= {
     val throttleTimeMs = maybeRecordAndGetThrottleTimeMs(request)
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
index d88e7f44325..bf8077707ba 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
@@ -16,9 +16,9 @@
  */
 package kafka.coordinator.group
 
-import 
kafka.coordinator.group.GroupCoordinatorConcurrencyTest.JoinGroupCallback
+import 
kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, 
SyncGroupCallback}
 import kafka.server.RequestLocal
-import org.apache.kafka.common.message.{HeartbeatRequestData, 
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData}
+import org.apache.kafka.common.message.{HeartbeatRequestData, 
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, 
SyncGroupRequestData, SyncGroupResponseData}
 import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
 import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
 import org.apache.kafka.common.network.{ClientInformation, ListenerName}
@@ -142,6 +142,74 @@ class GroupCoordinatorAdapterTest {
     assertEquals(expectedData, future.get())
   }
 
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.SYNC_GROUP)
+  def testSyncGroup(version: Short): Unit = {
+    val groupCoordinator = mock(classOf[GroupCoordinator])
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+
+    val ctx = makeContext(ApiKeys.SYNC_GROUP, version)
+    val data = new SyncGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member1")
+      .setGroupInstanceId("instance")
+      .setProtocolType("consumer")
+      .setProtocolName("range")
+      .setGenerationId(10)
+      .setAssignments(List(
+        new SyncGroupRequestData.SyncGroupRequestAssignment()
+          .setMemberId("member1")
+          .setAssignment("member1".getBytes()),
+        new SyncGroupRequestData.SyncGroupRequestAssignment()
+          .setMemberId("member2")
+          .setAssignment("member2".getBytes())
+      ).asJava)
+    val bufferSupplier = BufferSupplier.create()
+
+    val future = adapter.syncGroup(ctx, data, bufferSupplier)
+    assertFalse(future.isDone)
+
+    val capturedAssignment: ArgumentCaptor[Map[String, Array[Byte]]] =
+      ArgumentCaptor.forClass(classOf[Map[String, Array[Byte]]])
+    val capturedCallback: ArgumentCaptor[SyncGroupCallback] =
+      ArgumentCaptor.forClass(classOf[SyncGroupCallback])
+
+    verify(groupCoordinator).handleSyncGroup(
+      ArgumentMatchers.eq(data.groupId),
+      ArgumentMatchers.eq(data.generationId),
+      ArgumentMatchers.eq(data.memberId),
+      ArgumentMatchers.eq(Some(data.protocolType)),
+      ArgumentMatchers.eq(Some(data.protocolName)),
+      ArgumentMatchers.eq(Some(data.groupInstanceId)),
+      capturedAssignment.capture(),
+      capturedCallback.capture(),
+      ArgumentMatchers.eq(RequestLocal(bufferSupplier))
+    )
+
+    assertEquals(Map(
+      "member1" -> "member1",
+      "member2" -> "member2",
+    ), capturedAssignment.getValue.map { case (member, metadata) =>
+      (member, new String(metadata))
+    })
+
+    capturedCallback.getValue.apply(SyncGroupResult(
+      error = Errors.NONE,
+      protocolType = Some("consumer"),
+      protocolName = Some("range"),
+      memberAssignment = "member1".getBytes()
+    ))
+
+    val expectedResponseData = new SyncGroupResponseData()
+      .setErrorCode(Errors.NONE.code)
+      .setProtocolType("consumer")
+      .setProtocolName("range")
+      .setAssignment("member1".getBytes())
+
+    assertTrue(future.isDone)
+    assertEquals(expectedResponseData, future.get())
+  }
+
   @Test
   def testHeartbeat(): Unit = {
     val groupCoordinator = mock(classOf[GroupCoordinator])
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 32a0e0aec1a..459a640ae78 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -26,7 +26,6 @@ import java.util.{Collections, Optional, Properties, Random}
 import kafka.api.LeaderAndIsr
 import kafka.cluster.Broker
 import kafka.controller.{ControllerContext, KafkaController}
-import 
kafka.coordinator.group.GroupCoordinatorConcurrencyTest.SyncGroupCallback
 import kafka.coordinator.group._
 import kafka.coordinator.transaction.{InitProducerIdResult, 
TransactionCoordinator}
 import kafka.log.AppendOrigin
@@ -2709,110 +2708,132 @@ class KafkaApisTest {
     assertEquals(Errors.UNKNOWN_SERVER_ERROR, response.error)
   }
 
-  @Test
-  def testSyncGroupProtocolTypeAndName(): Unit = {
-    for (version <- ApiKeys.SYNC_GROUP.oldestVersion to 
ApiKeys.SYNC_GROUP.latestVersion) {
-      testSyncGroupProtocolTypeAndName(version.asInstanceOf[Short])
-    }
-  }
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.SYNC_GROUP)
+  def testHandleSyncGroupRequest(version: Short): Unit = {
+    val syncGroupRequest = new SyncGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setProtocolName("range")
 
-  def testSyncGroupProtocolTypeAndName(version: Short): Unit = {
-    reset(groupCoordinator, clientRequestQuotaManager, requestChannel, 
replicaManager)
+    val requestChannelRequest = buildRequest(new 
SyncGroupRequest.Builder(syncGroupRequest).build(version))
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val protocolName = "range"
+    val expectedSyncGroupRequest = new SyncGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType(if (version >= 5) "consumer" else null)
+      .setProtocolName(if (version >= 5) "range" else null)
 
-    val capturedCallback: ArgumentCaptor[SyncGroupCallback] = 
ArgumentCaptor.forClass(classOf[SyncGroupCallback])
+    val future = new CompletableFuture[SyncGroupResponseData]()
+    when(newGroupCoordinator.syncGroup(
+      requestChannelRequest.context,
+      expectedSyncGroupRequest,
+      RequestLocal.NoCaching.bufferSupplier
+    )).thenReturn(future)
 
-    val requestLocal = RequestLocal.withThreadConfinedCaching
-    val syncGroupRequest = new SyncGroupRequest.Builder(
-      new SyncGroupRequestData()
-        .setGroupId(groupId)
-        .setGenerationId(0)
-        .setMemberId(memberId)
-        .setProtocolType(protocolType)
-        .setProtocolName(protocolName)
-    ).build(version)
+    createKafkaApis().handleSyncGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
 
-    val requestChannelRequest = buildRequest(syncGroupRequest)
+    val expectedSyncGroupResponse = new SyncGroupResponseData()
+      .setProtocolType("consumer")
+      .setProtocolName("range")
 
-    createKafkaApis().handleSyncGroupRequest(requestChannelRequest, 
requestLocal)
+    future.complete(expectedSyncGroupResponse)
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
+    val response = capturedResponse.getValue.asInstanceOf[SyncGroupResponse]
+    assertEquals(expectedSyncGroupResponse, response.data)
+  }
 
-    verify(groupCoordinator).handleSyncGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(0),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(if (version >= 5) Some(protocolType) else None),
-      ArgumentMatchers.eq(if (version >= 5) Some(protocolName) else None),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(Map.empty),
-      capturedCallback.capture(),
-      ArgumentMatchers.eq(requestLocal)
+  @Test
+  def testHandleSyncGroupRequestFutureFailed(): Unit = {
+    val syncGroupRequest = new SyncGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setProtocolName("range")
+
+    val requestChannelRequest = buildRequest(new 
SyncGroupRequest.Builder(syncGroupRequest).build())
+
+    val expectedSyncGroupRequest = new SyncGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setProtocolName("range")
+
+    val future = new CompletableFuture[SyncGroupResponseData]()
+    when(newGroupCoordinator.syncGroup(
+      requestChannelRequest.context,
+      expectedSyncGroupRequest,
+      RequestLocal.NoCaching.bufferSupplier
+    )).thenReturn(future)
+
+    createKafkaApis().handleSyncGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
     )
-    capturedCallback.getValue.apply(SyncGroupResult(
-      protocolType = Some(protocolType),
-      protocolName = Some(protocolName),
-      memberAssignment = Array.empty,
-      error = Errors.NONE
-    ))
 
+    future.completeExceptionally(Errors.UNKNOWN_SERVER_ERROR.exception)
     val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[SyncGroupResponse]
-
-    assertEquals(Errors.NONE, response.error)
-    assertArrayEquals(Array.empty[Byte], response.data.assignment)
-    assertEquals(protocolType, response.data.protocolType)
+    assertEquals(Errors.UNKNOWN_SERVER_ERROR, response.error)
   }
 
   @Test
-  def testSyncGroupProtocolTypeAndNameAreMandatorySinceV5(): Unit = {
-    for (version <- ApiKeys.SYNC_GROUP.oldestVersion to 
ApiKeys.SYNC_GROUP.latestVersion) {
-      
testSyncGroupProtocolTypeAndNameAreMandatorySinceV5(version.asInstanceOf[Short])
-    }
-  }
+  def testHandleSyncGroupRequestAuthenticationFailed(): Unit = {
+    val syncGroupRequest = new SyncGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setProtocolName("range")
 
-  def testSyncGroupProtocolTypeAndNameAreMandatorySinceV5(version: Short): 
Unit = {
-    reset(groupCoordinator, clientRequestQuotaManager, requestChannel, 
replicaManager)
+    val requestChannelRequest = buildRequest(new 
SyncGroupRequest.Builder(syncGroupRequest).build())
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val protocolName = "range"
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+      .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
 
-    val capturedCallback: ArgumentCaptor[SyncGroupCallback] = 
ArgumentCaptor.forClass(classOf[SyncGroupCallback])
+    createKafkaApis(authorizer = Some(authorizer)).handleSyncGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
 
-    val requestLocal = RequestLocal.withThreadConfinedCaching
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
+    val response = capturedResponse.getValue.asInstanceOf[SyncGroupResponse]
+    assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error)
+  }
 
-    val syncGroupRequest = new SyncGroupRequest.Builder(
-      new SyncGroupRequestData()
-        .setGroupId(groupId)
-        .setGenerationId(0)
-        .setMemberId(memberId)
-    ).build(version)
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.SYNC_GROUP)
+  def testSyncGroupProtocolTypeAndNameAreMandatorySinceV5(version: Short): 
Unit = {
+    val syncGroupRequest = new SyncGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
 
-    val requestChannelRequest = buildRequest(syncGroupRequest)
+    val requestChannelRequest = buildRequest(new 
SyncGroupRequest.Builder(syncGroupRequest).build(version))
+
+    val expectedSyncGroupRequest = new SyncGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
 
-    createKafkaApis().handleSyncGroupRequest(requestChannelRequest, 
requestLocal)
+    val future = new CompletableFuture[SyncGroupResponseData]()
+    when(newGroupCoordinator.syncGroup(
+      requestChannelRequest.context,
+      expectedSyncGroupRequest,
+      RequestLocal.NoCaching.bufferSupplier
+    )).thenReturn(future)
+
+    createKafkaApis().handleSyncGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
 
     if (version < 5) {
-      verify(groupCoordinator).handleSyncGroup(
-        ArgumentMatchers.eq(groupId),
-        ArgumentMatchers.eq(0),
-        ArgumentMatchers.eq(memberId),
-        ArgumentMatchers.eq(None),
-        ArgumentMatchers.eq(None),
-        ArgumentMatchers.eq(None),
-        ArgumentMatchers.eq(Map.empty),
-        capturedCallback.capture(),
-        ArgumentMatchers.eq(requestLocal))
-      capturedCallback.getValue.apply(SyncGroupResult(
-        protocolType = Some(protocolType),
-        protocolName = Some(protocolName),
-        memberAssignment = Array.empty,
-        error = Errors.NONE
-      ))
+      future.complete(new SyncGroupResponseData()
+        .setProtocolType("consumer")
+        .setProtocolName("range"))
     }
 
     val capturedResponse = verifyNoThrottling(requestChannelRequest)
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index 36935981c6b..70aecf2ccae 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -20,6 +20,8 @@ import org.apache.kafka.common.message.HeartbeatRequestData;
 import org.apache.kafka.common.message.HeartbeatResponseData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupRequestData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.requests.RequestContext;
 import org.apache.kafka.common.utils.BufferSupplier;
 
@@ -42,6 +44,21 @@ public interface GroupCoordinator {
         BufferSupplier bufferSupplier
     );
 
+    /**
+     * Sync a Generic Group.
+     *
+     * @param context           The coordinator request context.
+     * @param request           The SyncGroupRequest data.
+     * @param bufferSupplier    The buffer supplier tight to the request 
thread.
+     *
+     * @return A future yielding the response or an exception.
+     */
+    CompletableFuture<SyncGroupResponseData> syncGroup(
+        RequestContext context,
+        SyncGroupRequestData request,
+        BufferSupplier bufferSupplier
+    );
+
     /**
      * Heartbeat to a Generic Group.
      *

Reply via email to